Changeset e6a78b9 in mainline for uspace/srv/net/tcp/sock.c


Ignore:
Timestamp:
2012-06-29T15:31:44Z (12 years ago)
Author:
Jakub Jermar <jakub@…>
Branches:
lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
Children:
9432f08
Parents:
34ab31c (diff), 0bbd13e (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge mainline changes.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/srv/net/tcp/sock.c

    r34ab31c re6a78b9  
    5151#include "ucall.h"
    5252
    53 #define FRAGMENT_SIZE 1024
    54 
    5553#define MAX_BACKLOG 128
    5654
     
    6664static void tcp_sock_connection(ipc_callid_t iid, ipc_call_t *icall, void *arg);
    6765static void tcp_sock_cstate_cb(tcp_conn_t *conn, void *arg);
     66static int tcp_sock_recv_fibril(void *arg);
    6867
    6968int tcp_sock_init(void)
    7069{
    71         int rc;
    72 
    7370        socket_ports_initialize(&gsock);
    74 
     71       
    7572        async_set_client_connection(tcp_sock_connection);
    76 
    77         rc = service_register(SERVICE_TCP);
     73       
     74        int rc = service_register(SERVICE_TCP);
    7875        if (rc != EOK)
    7976                return EEXIST;
    80 
     77       
    8178        return EOK;
    8279}
     
    9794        async_exch_t *exch = async_exchange_begin(sock_core->sess);
    9895        async_msg_5(exch, NET_SOCKET_RECEIVED, (sysarg_t)sock_core->socket_id,
    99             FRAGMENT_SIZE, 0, 0, 1);
     96            TCP_SOCK_FRAGMENT_SIZE, 0, 0, 1);
    10097        async_exchange_end(exch);
    10198}
     
    106103        async_exch_t *exch = async_exchange_begin(lsock_core->sess);
    107104        async_msg_5(exch, NET_SOCKET_ACCEPTED, (sysarg_t)lsock_core->socket_id,
    108             FRAGMENT_SIZE, 0, 0, 0);
     105            TCP_SOCK_FRAGMENT_SIZE, 0, 0, 0);
    109106        async_exchange_end(exch);
    110107}
    111108
     109static int tcp_sock_create(tcp_client_t *client, tcp_sockdata_t **rsock)
     110{
     111        tcp_sockdata_t *sock;
     112
     113        log_msg(LVL_DEBUG, "tcp_sock_create()");
     114        *rsock = NULL;
     115
     116        sock = calloc(sizeof(tcp_sockdata_t), 1);
     117        if (sock == NULL)
     118                return ENOMEM;
     119
     120        fibril_mutex_initialize(&sock->lock);
     121        sock->client = client;
     122
     123        sock->recv_buffer_used = 0;
     124        sock->recv_error = TCP_EOK;
     125        fibril_mutex_initialize(&sock->recv_buffer_lock);
     126        fibril_condvar_initialize(&sock->recv_buffer_cv);
     127        list_initialize(&sock->ready);
     128
     129        *rsock = sock;
     130        return EOK;
     131}
     132
     133static void tcp_sock_uncreate(tcp_sockdata_t *sock)
     134{
     135        log_msg(LVL_DEBUG, "tcp_sock_uncreate()");
     136        free(sock);
     137}
     138
     139static int tcp_sock_finish_setup(tcp_sockdata_t *sock, int *sock_id)
     140{
     141        socket_core_t *sock_core;
     142        int rc;
     143
     144        log_msg(LVL_DEBUG, "tcp_sock_finish_setup()");
     145
     146        sock->recv_fibril = fibril_create(tcp_sock_recv_fibril, sock);
     147        if (sock->recv_fibril == 0)
     148                return ENOMEM;
     149
     150        rc = socket_create(&sock->client->sockets, sock->client->sess,
     151            sock, sock_id);
     152
     153        if (rc != EOK) {
     154                fibril_destroy(sock->recv_fibril);
     155                sock->recv_fibril = 0;
     156                return rc;
     157        }
     158
     159        sock_core = socket_cores_find(&sock->client->sockets, *sock_id);
     160        assert(sock_core != NULL);
     161        sock->sock_core = sock_core;
     162
     163        return EOK;
     164}
     165
    112166static void tcp_sock_socket(tcp_client_t *client, ipc_callid_t callid, ipc_call_t call)
    113167{
    114168        tcp_sockdata_t *sock;
    115         socket_core_t *sock_core;
    116169        int sock_id;
    117170        int rc;
     
    119172
    120173        log_msg(LVL_DEBUG, "tcp_sock_socket()");
    121         sock = calloc(sizeof(tcp_sockdata_t), 1);
    122         if (sock == NULL) {
    123                 async_answer_0(callid, ENOMEM);
    124                 return;
    125         }
    126 
    127         fibril_mutex_initialize(&sock->lock);
    128         sock->client = client;
     174
     175        rc = tcp_sock_create(client, &sock);
     176        if (rc != EOK) {
     177                async_answer_0(callid, rc);
     178                return;
     179        }
     180
    129181        sock->laddr.ipv4 = TCP_IPV4_ANY;
    130182        sock->lconn = NULL;
    131183        sock->backlog = 0;
    132         list_initialize(&sock->ready);
    133184
    134185        sock_id = SOCKET_GET_SOCKET_ID(call);
    135         rc = socket_create(&client->sockets, client->sess, sock, &sock_id);
     186        rc = tcp_sock_finish_setup(sock, &sock_id);
    136187        if (rc != EOK) {
     188                tcp_sock_uncreate(sock);
    137189                async_answer_0(callid, rc);
    138190                return;
    139191        }
    140192
    141         sock_core = socket_cores_find(&client->sockets, sock_id);
    142         assert(sock_core != NULL);
    143         sock->sock_core = sock_core;
    144 
    145193        SOCKET_SET_SOCKET_ID(answer, sock_id);
    146194
    147         SOCKET_SET_DATA_FRAGMENT_SIZE(answer, FRAGMENT_SIZE);
     195        SOCKET_SET_DATA_FRAGMENT_SIZE(answer, TCP_SOCK_FRAGMENT_SIZE);
    148196        SOCKET_SET_HEADER_SIZE(answer, sizeof(tcp_header_t));
    149197       
     
    361409        }
    362410
     411        if (rc == EOK)
     412                fibril_add_ready(socket->recv_fibril);
     413
    363414        async_answer_0(callid, rc);
    364 
    365         /* Push one fragment notification to client's queue */
    366         tcp_sock_notify_data(sock_core);
    367         log_msg(LVL_DEBUG, "tcp_sock_connect(): notify conn\n");
    368415}
    369416
     
    374421        int asock_id;
    375422        socket_core_t *sock_core;
    376         socket_core_t *asock_core;
    377423        tcp_sockdata_t *socket;
    378424        tcp_sockdata_t *asocket;
     
    444490        /* Allocate socket for accepted connection */
    445491
    446         log_msg(LVL_DEBUG, "tcp_sock_accept(): allocate asocket\n");
    447         asocket = calloc(sizeof(tcp_sockdata_t), 1);
    448         if (asocket == NULL) {
    449                 fibril_mutex_unlock(&socket->lock);
    450                 async_answer_0(callid, ENOMEM);
    451                 return;
    452         }
    453 
    454         fibril_mutex_initialize(&asocket->lock);
    455         asocket->client = client;
     492        rc = tcp_sock_create(client, &asocket);
     493        if (rc != EOK) {
     494                fibril_mutex_unlock(&socket->lock);
     495                async_answer_0(callid, rc);
     496                return;
     497        }
     498
    456499        asocket->conn = conn;
    457500        log_msg(LVL_DEBUG, "tcp_sock_accept():create asocket\n");
    458501
    459         rc = socket_create(&client->sockets, client->sess, asocket, &asock_id);
     502        rc = tcp_sock_finish_setup(asocket, &asock_id);
    460503        if (rc != EOK) {
     504                tcp_sock_uncreate(asocket);
    461505                fibril_mutex_unlock(&socket->lock);
    462506                async_answer_0(callid, rc);
    463507                return;
    464508        }
     509
     510        fibril_add_ready(asocket->recv_fibril);
     511
    465512        log_msg(LVL_DEBUG, "tcp_sock_accept(): find acore\n");
    466513
    467         asock_core = socket_cores_find(&client->sockets, asock_id);
    468         assert(asock_core != NULL);
    469 
    470         SOCKET_SET_DATA_FRAGMENT_SIZE(answer, FRAGMENT_SIZE);
     514        SOCKET_SET_DATA_FRAGMENT_SIZE(answer, TCP_SOCK_FRAGMENT_SIZE);
    471515        SOCKET_SET_SOCKET_ID(answer, asock_id);
    472516        SOCKET_SET_ADDRESS_LENGTH(answer, sizeof(struct sockaddr_in));
    473517       
    474         async_answer_3(callid, asock_core->socket_id,
     518        async_answer_3(callid, asocket->sock_core->socket_id,
    475519            IPC_GET_ARG1(answer), IPC_GET_ARG2(answer),
    476520            IPC_GET_ARG3(answer));
     
    478522        /* Push one fragment notification to client's queue */
    479523        log_msg(LVL_DEBUG, "tcp_sock_accept(): notify data\n");
    480         tcp_sock_notify_data(asock_core);
    481524        fibril_mutex_unlock(&socket->lock);
    482525}
     
    492535        ipc_callid_t wcallid;
    493536        size_t length;
    494         uint8_t buffer[FRAGMENT_SIZE];
     537        uint8_t buffer[TCP_SOCK_FRAGMENT_SIZE];
    495538        tcp_error_t trc;
    496539        int rc;
     
    523566                }
    524567
    525                 if (length > FRAGMENT_SIZE)
    526                         length = FRAGMENT_SIZE;
     568                if (length > TCP_SOCK_FRAGMENT_SIZE)
     569                        length = TCP_SOCK_FRAGMENT_SIZE;
    527570
    528571                rc = async_data_write_finalize(wcallid, buffer, length);
     
    560603
    561604        IPC_SET_ARG1(answer, 0);
    562         SOCKET_SET_DATA_FRAGMENT_SIZE(answer, FRAGMENT_SIZE);
     605        SOCKET_SET_DATA_FRAGMENT_SIZE(answer, TCP_SOCK_FRAGMENT_SIZE);
    563606        async_answer_2(callid, EOK, IPC_GET_ARG1(answer),
    564607            IPC_GET_ARG2(answer));
     
    581624        ipc_call_t answer;
    582625        ipc_callid_t rcallid;
    583         uint8_t buffer[FRAGMENT_SIZE];
    584626        size_t data_len;
    585         xflags_t xflags;
    586         tcp_error_t trc;
    587627        struct sockaddr_in addr;
    588628        tcp_sock_t *rsock;
     
    611651        (void)flags;
    612652
    613         trc = tcp_uc_receive(socket->conn, buffer, FRAGMENT_SIZE, &data_len,
    614             &xflags);
    615         log_msg(LVL_DEBUG, "**** tcp_uc_receive done");
    616 
    617         switch (trc) {
     653        log_msg(LVL_DEBUG, "tcp_sock_recvfrom(): lock recv_buffer_lock");
     654        fibril_mutex_lock(&socket->recv_buffer_lock);
     655        while (socket->recv_buffer_used == 0 && socket->recv_error == TCP_EOK) {
     656                log_msg(LVL_DEBUG, "wait for recv_buffer_cv + recv_buffer_used != 0");
     657                fibril_condvar_wait(&socket->recv_buffer_cv,
     658                    &socket->recv_buffer_lock);
     659        }
     660
     661        log_msg(LVL_DEBUG, "Got data in sock recv_buffer");
     662
     663        data_len = socket->recv_buffer_used;
     664        rc = socket->recv_error;
     665
     666        switch (socket->recv_error) {
    618667        case TCP_EOK:
    619668                rc = EOK;
     
    630679        }
    631680
    632         log_msg(LVL_DEBUG, "**** tcp_uc_receive -> %d", rc);
     681        log_msg(LVL_DEBUG, "**** recv result -> %d", rc);
    633682        if (rc != EOK) {
     683                fibril_mutex_unlock(&socket->recv_buffer_lock);
    634684                fibril_mutex_unlock(&socket->lock);
    635685                async_answer_0(callid, rc);
     
    646696                log_msg(LVL_DEBUG, "addr read receive");
    647697                if (!async_data_read_receive(&rcallid, &addr_length)) {
     698                        fibril_mutex_unlock(&socket->recv_buffer_lock);
    648699                        fibril_mutex_unlock(&socket->lock);
    649700                        async_answer_0(callid, EINVAL);
     
    657708                rc = async_data_read_finalize(rcallid, &addr, addr_length);
    658709                if (rc != EOK) {
     710                        fibril_mutex_unlock(&socket->recv_buffer_lock);
    659711                        fibril_mutex_unlock(&socket->lock);
    660712                        async_answer_0(callid, EINVAL);
     
    665717        log_msg(LVL_DEBUG, "data read receive");
    666718        if (!async_data_read_receive(&rcallid, &length)) {
     719                fibril_mutex_unlock(&socket->recv_buffer_lock);
    667720                fibril_mutex_unlock(&socket->lock);
    668721                async_answer_0(callid, EINVAL);
     
    674727
    675728        log_msg(LVL_DEBUG, "data read finalize");
    676         rc = async_data_read_finalize(rcallid, buffer, length);
     729        rc = async_data_read_finalize(rcallid, socket->recv_buffer, length);
     730
     731        socket->recv_buffer_used -= length;
     732        log_msg(LVL_DEBUG, "tcp_sock_recvfrom: %zu left in buffer",
     733            socket->recv_buffer_used);
     734        if (socket->recv_buffer_used > 0) {
     735                memmove(socket->recv_buffer, socket->recv_buffer + length,
     736                    socket->recv_buffer_used);
     737                tcp_sock_notify_data(socket->sock_core);
     738        }
     739
     740        fibril_condvar_broadcast(&socket->recv_buffer_cv);
    677741
    678742        if (length < data_len && rc == EOK)
     
    681745        SOCKET_SET_READ_DATA_LENGTH(answer, length);
    682746        async_answer_1(callid, EOK, IPC_GET_ARG1(answer));
    683        
    684         /* Push one fragment notification to client's queue */
    685         tcp_sock_notify_data(sock_core);
     747
     748        fibril_mutex_unlock(&socket->recv_buffer_lock);
    686749        fibril_mutex_unlock(&socket->lock);
    687750}
     
    694757        tcp_error_t trc;
    695758        int rc;
    696         uint8_t buffer[FRAGMENT_SIZE];
    697         size_t data_len;
    698         xflags_t xflags;
    699759
    700760        log_msg(LVL_DEBUG, "tcp_sock_close()");
     
    717777                        return;
    718778                }
    719 
    720                 /* Drain incoming data. This should really be done in the background. */
    721                 do {
    722                         trc = tcp_uc_receive(socket->conn, buffer,
    723                             FRAGMENT_SIZE, &data_len, &xflags);
    724                 } while (trc == TCP_EOK);
    725 
    726                 tcp_uc_delete(socket->conn);
    727779        }
    728780
     
    776828        tcp_sock_notify_aconn(socket->sock_core);
    777829        fibril_mutex_unlock(&socket->lock);
     830}
     831
     832static int tcp_sock_recv_fibril(void *arg)
     833{
     834        tcp_sockdata_t *sock = (tcp_sockdata_t *)arg;
     835        size_t data_len;
     836        xflags_t xflags;
     837        tcp_error_t trc;
     838
     839        log_msg(LVL_DEBUG, "tcp_sock_recv_fibril()");
     840
     841        while (true) {
     842                log_msg(LVL_DEBUG, "call tcp_uc_receive()");
     843                fibril_mutex_lock(&sock->recv_buffer_lock);
     844                while (sock->recv_buffer_used != 0)
     845                        fibril_condvar_wait(&sock->recv_buffer_cv,
     846                            &sock->recv_buffer_lock);
     847
     848                trc = tcp_uc_receive(sock->conn, sock->recv_buffer,
     849                    TCP_SOCK_FRAGMENT_SIZE, &data_len, &xflags);
     850
     851                if (trc != TCP_EOK) {
     852                        sock->recv_error = trc;
     853                        fibril_condvar_broadcast(&sock->recv_buffer_cv);
     854                        fibril_mutex_unlock(&sock->recv_buffer_lock);
     855                        tcp_sock_notify_data(sock->sock_core);
     856                        break;
     857                }
     858
     859                log_msg(LVL_DEBUG, "got data - broadcast recv_buffer_cv");
     860
     861                sock->recv_buffer_used = data_len;
     862                fibril_condvar_broadcast(&sock->recv_buffer_cv);
     863                fibril_mutex_unlock(&sock->recv_buffer_lock);
     864                tcp_sock_notify_data(sock->sock_core);
     865        }
     866
     867        tcp_uc_delete(sock->conn);
     868
     869        return 0;
    778870}
    779871
Note: See TracChangeset for help on using the changeset viewer.