Fork us on GitHub Follow us on Facebook Follow us on Twitter

Changeset d786dea9 in mainline


Ignore:
Timestamp:
2012-05-11T09:44:14Z (9 years ago)
Author:
Jiri Svoboda <jiri@…>
Branches:
master
Children:
9094c0f
Parents:
fce7b43
Message:

Use a receive fibril in TCP socket provider to make reception non-blocking. This is more of a hack, the whole code needs to be revamped.

Location:
uspace/srv/net/tcp
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • uspace/srv/net/tcp/sock.c

    rfce7b43 rd786dea9  
    5151#include "ucall.h"
    5252
    53 #define FRAGMENT_SIZE 1024
    54 
    5553#define MAX_BACKLOG 128
    5654
     
    6664static void tcp_sock_connection(ipc_callid_t iid, ipc_call_t *icall, void *arg);
    6765static void tcp_sock_cstate_cb(tcp_conn_t *conn, void *arg);
     66static int tcp_sock_recv_fibril(void *arg);
    6867
    6968int tcp_sock_init(void)
     
    9796        async_exch_t *exch = async_exchange_begin(sock_core->sess);
    9897        async_msg_5(exch, NET_SOCKET_RECEIVED, (sysarg_t)sock_core->socket_id,
    99             FRAGMENT_SIZE, 0, 0, 1);
     98            TCP_SOCK_FRAGMENT_SIZE, 0, 0, 1);
    10099        async_exchange_end(exch);
    101100}
     
    106105        async_exch_t *exch = async_exchange_begin(lsock_core->sess);
    107106        async_msg_5(exch, NET_SOCKET_ACCEPTED, (sysarg_t)lsock_core->socket_id,
    108             FRAGMENT_SIZE, 0, 0, 0);
     107            TCP_SOCK_FRAGMENT_SIZE, 0, 0, 0);
    109108        async_exchange_end(exch);
    110109}
    111110
     111static int tcp_sock_create(tcp_client_t *client, tcp_sockdata_t **rsock)
     112{
     113        tcp_sockdata_t *sock;
     114
     115        log_msg(LVL_DEBUG, "tcp_sock_create()");
     116        *rsock = NULL;
     117
     118        sock = calloc(sizeof(tcp_sockdata_t), 1);
     119        if (sock == NULL)
     120                return ENOMEM;
     121
     122        fibril_mutex_initialize(&sock->lock);
     123        sock->client = client;
     124
     125        sock->recv_buffer_used = 0;
     126        sock->recv_error = TCP_EOK;
     127        fibril_mutex_initialize(&sock->recv_buffer_lock);
     128        fibril_condvar_initialize(&sock->recv_buffer_cv);
     129        list_initialize(&sock->ready);
     130
     131        *rsock = sock;
     132        return EOK;
     133}
     134
     135static void tcp_sock_uncreate(tcp_sockdata_t *sock)
     136{
     137        log_msg(LVL_DEBUG, "tcp_sock_uncreate()");
     138        free(sock);
     139}
     140
     141static int tcp_sock_finish_setup(tcp_sockdata_t *sock, int *sock_id)
     142{
     143        socket_core_t *sock_core;
     144        int rc;
     145
     146        log_msg(LVL_DEBUG, "tcp_sock_finish_setup()");
     147
     148        sock->recv_fibril = fibril_create(tcp_sock_recv_fibril, sock);
     149        if (sock->recv_fibril == 0)
     150                return ENOMEM;
     151
     152        rc = socket_create(&sock->client->sockets, sock->client->sess,
     153            sock, sock_id);
     154
     155        if (rc != EOK)
     156                return rc;
     157
     158        sock_core = socket_cores_find(&sock->client->sockets, *sock_id);
     159        assert(sock_core != NULL);
     160        sock->sock_core = sock_core;
     161
     162        return EOK;
     163}
     164
    112165static void tcp_sock_socket(tcp_client_t *client, ipc_callid_t callid, ipc_call_t call)
    113166{
    114167        tcp_sockdata_t *sock;
    115         socket_core_t *sock_core;
    116168        int sock_id;
    117169        int rc;
     
    119171
    120172        log_msg(LVL_DEBUG, "tcp_sock_socket()");
    121         sock = calloc(sizeof(tcp_sockdata_t), 1);
    122         if (sock == NULL) {
    123                 async_answer_0(callid, ENOMEM);
    124                 return;
    125         }
    126 
    127         fibril_mutex_initialize(&sock->lock);
    128         sock->client = client;
     173
     174        rc = tcp_sock_create(client, &sock);
     175        if (rc != EOK) {
     176                async_answer_0(callid, rc);
     177                return;
     178        }
     179
    129180        sock->laddr.ipv4 = TCP_IPV4_ANY;
    130181        sock->lconn = NULL;
    131182        sock->backlog = 0;
    132         list_initialize(&sock->ready);
    133183
    134184        sock_id = SOCKET_GET_SOCKET_ID(call);
    135         rc = socket_create(&client->sockets, client->sess, sock, &sock_id);
     185        rc = tcp_sock_finish_setup(sock, &sock_id);
    136186        if (rc != EOK) {
     187                tcp_sock_uncreate(sock);
    137188                async_answer_0(callid, rc);
    138189                return;
    139190        }
    140191
    141         sock_core = socket_cores_find(&client->sockets, sock_id);
    142         assert(sock_core != NULL);
    143         sock->sock_core = sock_core;
    144 
    145192        SOCKET_SET_SOCKET_ID(answer, sock_id);
    146193
    147         SOCKET_SET_DATA_FRAGMENT_SIZE(answer, FRAGMENT_SIZE);
     194        SOCKET_SET_DATA_FRAGMENT_SIZE(answer, TCP_SOCK_FRAGMENT_SIZE);
    148195        SOCKET_SET_HEADER_SIZE(answer, sizeof(tcp_header_t));
    149196       
     
    361408        }
    362409
     410        if (rc == EOK)
     411                fibril_add_ready(socket->recv_fibril);
     412
    363413        async_answer_0(callid, rc);
    364 
    365         /* Push one fragment notification to client's queue */
    366         tcp_sock_notify_data(sock_core);
    367         log_msg(LVL_DEBUG, "tcp_sock_connect(): notify conn\n");
    368414}
    369415
     
    374420        int asock_id;
    375421        socket_core_t *sock_core;
    376         socket_core_t *asock_core;
    377422        tcp_sockdata_t *socket;
    378423        tcp_sockdata_t *asocket;
     
    444489        /* Allocate socket for accepted connection */
    445490
    446         log_msg(LVL_DEBUG, "tcp_sock_accept(): allocate asocket\n");
    447         asocket = calloc(sizeof(tcp_sockdata_t), 1);
    448         if (asocket == NULL) {
    449                 fibril_mutex_unlock(&socket->lock);
    450                 async_answer_0(callid, ENOMEM);
    451                 return;
    452         }
    453 
    454         fibril_mutex_initialize(&asocket->lock);
    455         asocket->client = client;
     491        rc = tcp_sock_create(client, &asocket);
     492        if (rc != EOK) {
     493                fibril_mutex_unlock(&socket->lock);
     494                async_answer_0(callid, rc);
     495                return;
     496        }
     497
    456498        asocket->conn = conn;
    457499        log_msg(LVL_DEBUG, "tcp_sock_accept():create asocket\n");
    458500
    459         rc = socket_create(&client->sockets, client->sess, asocket, &asock_id);
     501        rc = tcp_sock_finish_setup(asocket, &asock_id);
    460502        if (rc != EOK) {
     503                tcp_sock_uncreate(asocket);
    461504                fibril_mutex_unlock(&socket->lock);
    462505                async_answer_0(callid, rc);
    463506                return;
    464507        }
     508
     509        fibril_add_ready(asocket->recv_fibril);
     510
    465511        log_msg(LVL_DEBUG, "tcp_sock_accept(): find acore\n");
    466512
    467         asock_core = socket_cores_find(&client->sockets, asock_id);
    468         assert(asock_core != NULL);
    469 
    470         SOCKET_SET_DATA_FRAGMENT_SIZE(answer, FRAGMENT_SIZE);
     513        SOCKET_SET_DATA_FRAGMENT_SIZE(answer, TCP_SOCK_FRAGMENT_SIZE);
    471514        SOCKET_SET_SOCKET_ID(answer, asock_id);
    472515        SOCKET_SET_ADDRESS_LENGTH(answer, sizeof(struct sockaddr_in));
    473516       
    474         async_answer_3(callid, asock_core->socket_id,
     517        async_answer_3(callid, asocket->sock_core->socket_id,
    475518            IPC_GET_ARG1(answer), IPC_GET_ARG2(answer),
    476519            IPC_GET_ARG3(answer));
     
    478521        /* Push one fragment notification to client's queue */
    479522        log_msg(LVL_DEBUG, "tcp_sock_accept(): notify data\n");
    480         tcp_sock_notify_data(asock_core);
    481523        fibril_mutex_unlock(&socket->lock);
    482524}
     
    492534        ipc_callid_t wcallid;
    493535        size_t length;
    494         uint8_t buffer[FRAGMENT_SIZE];
     536        uint8_t buffer[TCP_SOCK_FRAGMENT_SIZE];
    495537        tcp_error_t trc;
    496538        int rc;
     
    523565                }
    524566
    525                 if (length > FRAGMENT_SIZE)
    526                         length = FRAGMENT_SIZE;
     567                if (length > TCP_SOCK_FRAGMENT_SIZE)
     568                        length = TCP_SOCK_FRAGMENT_SIZE;
    527569
    528570                rc = async_data_write_finalize(wcallid, buffer, length);
     
    560602
    561603        IPC_SET_ARG1(answer, 0);
    562         SOCKET_SET_DATA_FRAGMENT_SIZE(answer, FRAGMENT_SIZE);
     604        SOCKET_SET_DATA_FRAGMENT_SIZE(answer, TCP_SOCK_FRAGMENT_SIZE);
    563605        async_answer_2(callid, EOK, IPC_GET_ARG1(answer),
    564606            IPC_GET_ARG2(answer));
     
    581623        ipc_call_t answer;
    582624        ipc_callid_t rcallid;
    583         uint8_t buffer[FRAGMENT_SIZE];
    584625        size_t data_len;
    585         xflags_t xflags;
    586         tcp_error_t trc;
    587626        struct sockaddr_in addr;
    588627        tcp_sock_t *rsock;
     
    611650        (void)flags;
    612651
    613         trc = tcp_uc_receive(socket->conn, buffer, FRAGMENT_SIZE, &data_len,
    614             &xflags);
    615         log_msg(LVL_DEBUG, "**** tcp_uc_receive done");
    616 
    617         switch (trc) {
     652        log_msg(LVL_DEBUG, "tcp_sock_recvfrom(): lock recv_buffer_lock");
     653        fibril_mutex_lock(&socket->recv_buffer_lock);
     654        while (socket->recv_buffer_used == 0) {
     655                log_msg(LVL_DEBUG, "wait for recv_buffer_cv + recv_buffer_used != 0");
     656                fibril_condvar_wait(&socket->recv_buffer_cv,
     657                    &socket->recv_buffer_lock);
     658        }
     659
     660        log_msg(LVL_DEBUG, "Got data in sock recv_buffer");
     661
     662        data_len = socket->recv_buffer_used;
     663        rc = socket->recv_error;
     664
     665        switch (socket->recv_error) {
    618666        case TCP_EOK:
    619667                rc = EOK;
     
    630678        }
    631679
    632         log_msg(LVL_DEBUG, "**** tcp_uc_receive -> %d", rc);
     680        log_msg(LVL_DEBUG, "**** recv result -> %d", rc);
    633681        if (rc != EOK) {
     682                fibril_mutex_unlock(&socket->recv_buffer_lock);
    634683                fibril_mutex_unlock(&socket->lock);
    635684                async_answer_0(callid, rc);
     
    646695                log_msg(LVL_DEBUG, "addr read receive");
    647696                if (!async_data_read_receive(&rcallid, &addr_length)) {
     697                        fibril_mutex_unlock(&socket->recv_buffer_lock);
    648698                        fibril_mutex_unlock(&socket->lock);
    649699                        async_answer_0(callid, EINVAL);
     
    657707                rc = async_data_read_finalize(rcallid, &addr, addr_length);
    658708                if (rc != EOK) {
     709                        fibril_mutex_unlock(&socket->recv_buffer_lock);
    659710                        fibril_mutex_unlock(&socket->lock);
    660711                        async_answer_0(callid, EINVAL);
     
    665716        log_msg(LVL_DEBUG, "data read receive");
    666717        if (!async_data_read_receive(&rcallid, &length)) {
     718                fibril_mutex_unlock(&socket->recv_buffer_lock);
    667719                fibril_mutex_unlock(&socket->lock);
    668720                async_answer_0(callid, EINVAL);
     
    674726
    675727        log_msg(LVL_DEBUG, "data read finalize");
    676         rc = async_data_read_finalize(rcallid, buffer, length);
     728        rc = async_data_read_finalize(rcallid, socket->recv_buffer, length);
     729
     730        socket->recv_buffer_used = 0;
     731        fibril_condvar_broadcast(&socket->recv_buffer_cv);
    677732
    678733        if (length < data_len && rc == EOK)
     
    681736        SOCKET_SET_READ_DATA_LENGTH(answer, length);
    682737        async_answer_1(callid, EOK, IPC_GET_ARG1(answer));
    683        
    684         /* Push one fragment notification to client's queue */
    685         tcp_sock_notify_data(sock_core);
     738
     739        fibril_mutex_unlock(&socket->recv_buffer_lock);
    686740        fibril_mutex_unlock(&socket->lock);
    687741}
     
    694748        tcp_error_t trc;
    695749        int rc;
    696         uint8_t buffer[FRAGMENT_SIZE];
    697         size_t data_len;
    698         xflags_t xflags;
    699750
    700751        log_msg(LVL_DEBUG, "tcp_sock_close()");
     
    717768                        return;
    718769                }
    719 
    720                 /* Drain incoming data. This should really be done in the background. */
    721                 do {
    722                         trc = tcp_uc_receive(socket->conn, buffer,
    723                             FRAGMENT_SIZE, &data_len, &xflags);
    724                 } while (trc == TCP_EOK);
    725 
    726                 tcp_uc_delete(socket->conn);
    727770        }
    728771
     
    776819        tcp_sock_notify_aconn(socket->sock_core);
    777820        fibril_mutex_unlock(&socket->lock);
     821}
     822
     823static int tcp_sock_recv_fibril(void *arg)
     824{
     825        tcp_sockdata_t *sock = (tcp_sockdata_t *)arg;
     826        size_t data_len;
     827        xflags_t xflags;
     828        tcp_error_t trc;
     829
     830        log_msg(LVL_DEBUG, "tcp_sock_recv_fibril()");
     831
     832        while (true) {
     833                log_msg(LVL_DEBUG, "call tcp_uc_receive()");
     834                fibril_mutex_lock(&sock->recv_buffer_lock);
     835                while (sock->recv_buffer_used != 0)
     836                        fibril_condvar_wait(&sock->recv_buffer_cv,
     837                            &sock->recv_buffer_lock);
     838
     839                trc = tcp_uc_receive(sock->conn, sock->recv_buffer,
     840                    TCP_SOCK_FRAGMENT_SIZE, &data_len, &xflags);
     841
     842                if (trc != TCP_EOK) {
     843                        sock->recv_error = trc;
     844                        fibril_mutex_unlock(&sock->recv_buffer_lock);
     845                        break;
     846                }
     847
     848                log_msg(LVL_DEBUG, "got data - broadcast recv_buffer_cv");
     849
     850                sock->recv_buffer_used = data_len;
     851                fibril_mutex_unlock(&sock->recv_buffer_lock);
     852                fibril_condvar_broadcast(&sock->recv_buffer_cv);
     853                tcp_sock_notify_data(sock->sock_core);
     854        }
     855
     856        tcp_uc_delete(sock->conn);
     857
     858        return 0;
    778859}
    779860
  • uspace/srv/net/tcp/tcp_type.h

    rfce7b43 rd786dea9  
    3939#include <async.h>
    4040#include <bool.h>
     41#include <fibril.h>
    4142#include <fibril_synch.h>
    4243#include <socket_core.h>
     
    331332} tcp_client_t;
    332333
     334#define TCP_SOCK_FRAGMENT_SIZE 1024
     335
    333336typedef struct tcp_sockdata {
    334337        /** Lock */
     
    348351        /** List of connections (from lconn) that are ready to be accepted */
    349352        list_t ready;
     353        /** Receiving fibril */
     354        fid_t recv_fibril;
     355        uint8_t recv_buffer[TCP_SOCK_FRAGMENT_SIZE];
     356        size_t recv_buffer_used;
     357        fibril_mutex_t recv_buffer_lock;
     358        fibril_condvar_t recv_buffer_cv;
     359        tcp_error_t recv_error;
    350360} tcp_sockdata_t;
    351361
Note: See TracChangeset for help on using the changeset viewer.