The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/ofed/drivers/infiniband/ulp/sdp/sdp_rx.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
    3  *
    4  * Copyright (c) 2009 Mellanox Technologies Ltd.  All rights reserved.
    5  *
    6  * This software is available to you under a choice of one of two
    7  * licenses.  You may choose to be licensed under the terms of the GNU
    8  * General Public License (GPL) Version 2, available from the file
    9  * COPYING in the main directory of this source tree, or the
   10  * OpenIB.org BSD license below:
   11  *
   12  *     Redistribution and use in source and binary forms, with or
   13  *     without modification, are permitted provided that the following
   14  *     conditions are met:
   15  *
   16  *      - Redistributions of source code must retain the above
   17  *        copyright notice, this list of conditions and the following
   18  *        disclaimer.
   19  *
   20  *      - Redistributions in binary form must reproduce the above
   21  *        copyright notice, this list of conditions and the following
   22  *        disclaimer in the documentation and/or other materials
   23  *        provided with the distribution.
   24  *
   25  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
   26  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
   27  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
   28  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
   29  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
   30  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   31  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
   32  * SOFTWARE.
   33  */
   34 #include "sdp.h"
   35 
   36 SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024,
   37                 "Receive buffer initial size in bytes.");
   38 SDP_MODPARAM_SINT(rcvbuf_scale, 0x8,
   39                 "Receive buffer size scale factor.");
   40 
   41 /* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
   42 static void
   43 sdp_handle_disconn(struct sdp_sock *ssk)
   44 {
   45 
   46         sdp_dbg(ssk->socket, "%s\n", __func__);
   47 
   48         SDP_WLOCK_ASSERT(ssk);
   49         if (TCPS_HAVERCVDFIN(ssk->state) == 0)
   50                 socantrcvmore(ssk->socket);
   51 
   52         switch (ssk->state) {
   53         case TCPS_SYN_RECEIVED:
   54         case TCPS_ESTABLISHED:
   55                 ssk->state = TCPS_CLOSE_WAIT;
   56                 break;
   57 
   58         case TCPS_FIN_WAIT_1:
   59                 /* Received a reply FIN - start Infiniband tear down */
   60                 sdp_dbg(ssk->socket,
   61                     "%s: Starting Infiniband tear down sending DREQ\n",
   62                     __func__);
   63 
   64                 sdp_cancel_dreq_wait_timeout(ssk);
   65                 ssk->qp_active = 0;
   66                 if (ssk->id) {
   67                         struct rdma_cm_id *id;
   68 
   69                         id = ssk->id;
   70                         SDP_WUNLOCK(ssk);
   71                         rdma_disconnect(id);
   72                         SDP_WLOCK(ssk);
   73                 } else {
   74                         sdp_warn(ssk->socket,
   75                             "%s: ssk->id is NULL\n", __func__);
   76                         return;
   77                 }
   78                 break;
   79         case TCPS_TIME_WAIT:
   80                 /* This is a mutual close situation and we've got the DREQ from
   81                    the peer before the SDP_MID_DISCONNECT */
   82                 break;
   83         case TCPS_CLOSED:
   84                 /* FIN arrived after IB teardown started - do nothing */
   85                 sdp_dbg(ssk->socket, "%s: fin in state %s\n",
   86                     __func__, sdp_state_str(ssk->state));
   87                 return;
   88         default:
   89                 sdp_warn(ssk->socket,
   90                     "%s: FIN in unexpected state. state=%d\n",
   91                     __func__, ssk->state);
   92                 break;
   93         }
   94 }
   95 
   96 static int
   97 sdp_post_recv(struct sdp_sock *ssk)
   98 {
   99         struct sdp_buf *rx_req;
  100         int i, rc;
  101         u64 addr;
  102         struct ib_device *dev;
  103         struct ib_recv_wr rx_wr = { NULL };
  104         struct ib_sge ibsge[SDP_MAX_RECV_SGES];
  105         struct ib_sge *sge = ibsge;
  106         const struct ib_recv_wr *bad_wr;
  107         struct mbuf *mb, *m;
  108         struct sdp_bsdh *h;
  109         int id = ring_head(ssk->rx_ring);
  110 
  111         /* Now, allocate and repost recv */
  112         sdp_prf(ssk->socket, mb, "Posting mb");
  113         mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR);
  114         if (mb == NULL) {
  115                 /* Retry so we can't stall out with no memory. */
  116                 if (!rx_ring_posted(ssk))
  117                         queue_work(rx_comp_wq, &ssk->rx_comp_work);
  118                 return -1;
  119         }
  120         for (m = mb; m != NULL; m = m->m_next) {
  121                 m->m_len = M_SIZE(m);
  122                 mb->m_pkthdr.len += m->m_len;
  123         }
  124         h = mtod(mb, struct sdp_bsdh *);
  125         rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
  126         rx_req->mb = mb;
  127         dev = ssk->ib_device;
  128         for (i = 0;  mb != NULL; i++, mb = mb->m_next, sge++) {
  129                 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
  130                     DMA_TO_DEVICE);
  131                 /* TODO: proper error handling */
  132                 BUG_ON(ib_dma_mapping_error(dev, addr));
  133                 BUG_ON(i >= SDP_MAX_RECV_SGES);
  134                 rx_req->mapping[i] = addr;
  135                 sge->addr = addr;
  136                 sge->length = mb->m_len;
  137                 sge->lkey = ssk->sdp_dev->pd->local_dma_lkey;
  138         }
  139 
  140         rx_wr.next = NULL;
  141         rx_wr.wr_id = id | SDP_OP_RECV;
  142         rx_wr.sg_list = ibsge;
  143         rx_wr.num_sge = i;
  144         rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
  145         if (unlikely(rc)) {
  146                 sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc);
  147 
  148                 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
  149                 m_freem(mb);
  150 
  151                 sdp_notify(ssk, ECONNRESET);
  152 
  153                 return -1;
  154         }
  155 
  156         atomic_inc(&ssk->rx_ring.head);
  157         SDPSTATS_COUNTER_INC(post_recv);
  158 
  159         return 0;
  160 }
  161 
  162 static inline int
  163 sdp_post_recvs_needed(struct sdp_sock *ssk)
  164 {
  165         unsigned long bytes_in_process;
  166         unsigned long max_bytes;
  167         int buffer_size;
  168         int posted;
  169 
  170         if (!ssk->qp_active || !ssk->socket)
  171                 return 0;
  172 
  173         posted = rx_ring_posted(ssk);
  174         if (posted >= SDP_RX_SIZE)
  175                 return 0;
  176         if (posted < SDP_MIN_TX_CREDITS)
  177                 return 1;
  178 
  179         buffer_size = ssk->recv_bytes;
  180         max_bytes = max(ssk->socket->so_rcv.sb_hiwat,
  181             (1 + SDP_MIN_TX_CREDITS) * buffer_size);
  182         max_bytes *= rcvbuf_scale;
  183         /*
  184          * Compute bytes in the receive queue and socket buffer.
  185          */
  186         bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size;
  187         bytes_in_process += sbused(&ssk->socket->so_rcv);
  188 
  189         return bytes_in_process < max_bytes;
  190 }
  191 
  192 static inline void
  193 sdp_post_recvs(struct sdp_sock *ssk)
  194 {
  195 
  196         while (sdp_post_recvs_needed(ssk))
  197                 if (sdp_post_recv(ssk))
  198                         return;
  199 }
  200 
  201 static inline struct mbuf *
  202 sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb)
  203 {
  204         struct sdp_sock *ssk = sdp_sk(sk);
  205         struct sdp_bsdh *h;
  206 
  207         h = mtod(mb, struct sdp_bsdh *);
  208 
  209 #ifdef SDP_ZCOPY
  210         SDP_SKB_CB(mb)->seq = rcv_nxt(ssk);
  211         if (h->mid == SDP_MID_SRCAVAIL) {
  212                 struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
  213                 struct rx_srcavail_state *rx_sa;
  214                 
  215                 ssk->srcavail_cancel_mseq = 0;
  216 
  217                 ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc(
  218                                 sizeof(struct rx_srcavail_state), M_NOWAIT);
  219 
  220                 rx_sa->mseq = ntohl(h->mseq);
  221                 rx_sa->used = 0;
  222                 rx_sa->len = mb_len = ntohl(srcah->len);
  223                 rx_sa->rkey = ntohl(srcah->rkey);
  224                 rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
  225                 rx_sa->flags = 0;
  226 
  227                 if (ssk->tx_sa) {
  228                         sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting "
  229                                         "for TX SrcAvail. waking up TX SrcAvail"
  230                                         "to be aborted\n");
  231                         wake_up(sk->sk_sleep);
  232                 }
  233 
  234                 atomic_add(mb->len, &ssk->rcv_nxt);
  235                 sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n",
  236                         mb_len, rx_sa->vaddr);
  237         } else
  238 #endif
  239         {
  240                 atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt);
  241         }
  242 
  243         m_adj(mb, SDP_HEAD_SIZE);
  244         SOCKBUF_LOCK(&sk->so_rcv);
  245         if (unlikely(h->flags & SDP_OOB_PRES))
  246                 sdp_urg(ssk, mb);
  247         sbappend_locked(&sk->so_rcv, mb, 0);
  248         sorwakeup_locked(sk);
  249         return mb;
  250 }
  251 
  252 static int
  253 sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size)
  254 {
  255 
  256         return MIN(new_size, SDP_MAX_PACKET);
  257 }
  258 
  259 int
  260 sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
  261 {
  262 
  263         ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
  264         sdp_post_recvs(ssk);
  265 
  266         return 0;
  267 }
  268 
  269 int
  270 sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
  271 {
  272         u32 curr_size = ssk->recv_bytes;
  273         u32 max_size = SDP_MAX_PACKET;
  274 
  275         if (new_size > curr_size && new_size <= max_size) {
  276                 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
  277                 return 0;
  278         }
  279         return -1;
  280 }
  281 
  282 static void
  283 sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
  284 {
  285         if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
  286                 ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
  287         else
  288                 ssk->recv_request_head = ring_tail(ssk->rx_ring);
  289         ssk->recv_request = 1;
  290 }
  291 
  292 static void
  293 sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
  294 {
  295         u32 new_size = ntohl(buf->size);
  296 
  297         if (new_size > ssk->xmit_size_goal)
  298                 ssk->xmit_size_goal = new_size;
  299 }
  300 
  301 static struct mbuf *
  302 sdp_recv_completion(struct sdp_sock *ssk, int id)
  303 {
  304         struct sdp_buf *rx_req;
  305         struct ib_device *dev;
  306         struct mbuf *mb;
  307 
  308         if (unlikely(id != ring_tail(ssk->rx_ring))) {
  309                 printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
  310                         id, ring_tail(ssk->rx_ring));
  311                 return NULL;
  312         }
  313 
  314         dev = ssk->ib_device;
  315         rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
  316         mb = rx_req->mb;
  317         sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
  318 
  319         atomic_inc(&ssk->rx_ring.tail);
  320         atomic_dec(&ssk->remote_credits);
  321         return mb;
  322 }
  323 
  324 static void
  325 sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb)
  326 {
  327         struct sdp_bsdh *h;
  328         struct socket *sk;
  329 
  330         SDP_WLOCK_ASSERT(ssk);
  331 
  332         sk = ssk->socket;
  333         h = mtod(mb, struct sdp_bsdh *);
  334         switch (h->mid) {
  335         case SDP_MID_DATA:
  336         case SDP_MID_SRCAVAIL:
  337                 sdp_dbg(sk, "DATA after socket rcv was shutdown\n");
  338 
  339                 /* got data in RCV_SHUTDOWN */
  340                 if (ssk->state == TCPS_FIN_WAIT_1) {
  341                         sdp_dbg(sk, "RX data when state = FIN_WAIT1\n");
  342                         sdp_notify(ssk, ECONNRESET);
  343                 }
  344 
  345                 break;
  346 #ifdef SDP_ZCOPY
  347         case SDP_MID_RDMARDCOMPL:
  348                 break;
  349         case SDP_MID_SENDSM:
  350                 sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
  351                 break;
  352         case SDP_MID_SRCAVAIL_CANCEL:
  353                 sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
  354                 sdp_prf(sk, NULL, "Handling SrcAvailCancel");
  355                 if (ssk->rx_sa) {
  356                         ssk->srcavail_cancel_mseq = ntohl(h->mseq);
  357                         ssk->rx_sa->flags |= RX_SA_ABORTED;
  358                         ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get 
  359                                               the dirty logic from recvmsg */
  360                 } else {
  361                         sdp_dbg(sk, "Got SrcAvailCancel - "
  362                                         "but no SrcAvail in process\n");
  363                 }
  364                 break;
  365         case SDP_MID_SINKAVAIL:
  366                 sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
  367                 sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored");
  368                 /* FALLTHROUGH */
  369 #endif
  370         case SDP_MID_ABORT:
  371                 sdp_dbg_data(sk, "Handling ABORT\n");
  372                 sdp_prf(sk, NULL, "Handling ABORT");
  373                 sdp_notify(ssk, ECONNRESET);
  374                 break;
  375         case SDP_MID_DISCONN:
  376                 sdp_dbg_data(sk, "Handling DISCONN\n");
  377                 sdp_prf(sk, NULL, "Handling DISCONN");
  378                 sdp_handle_disconn(ssk);
  379                 break;
  380         case SDP_MID_CHRCVBUF:
  381                 sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
  382                 sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1));
  383                 break;
  384         case SDP_MID_CHRCVBUF_ACK:
  385                 sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
  386                 sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1));
  387                 break;
  388         default:
  389                 /* TODO: Handle other messages */
  390                 sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
  391                 break;
  392         }
  393         m_freem(mb);
  394 }
  395 
  396 static int
  397 sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb)
  398 {
  399         struct socket *sk;
  400         struct sdp_bsdh *h;
  401         unsigned long mseq_ack;
  402         int credits_before;
  403 
  404         h = mtod(mb, struct sdp_bsdh *);
  405         sk = ssk->socket;
  406         /*
  407          * If another thread is in so_pcbfree this may be partially torn
  408          * down but no further synchronization is required as the destroying
  409          * thread will wait for receive to shutdown before discarding the
  410          * socket.
  411          */
  412         if (sk == NULL) {
  413                 m_freem(mb);
  414                 return 0;
  415         }
  416 
  417         SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
  418 
  419         mseq_ack = ntohl(h->mseq_ack);
  420         credits_before = tx_credits(ssk);
  421         atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) +
  422                         1 + ntohs(h->bufs));
  423         if (mseq_ack >= ssk->nagle_last_unacked)
  424                 ssk->nagle_last_unacked = 0;
  425 
  426         sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n",
  427                 mid2str(h->mid), ntohs(h->bufs), credits_before,
  428                 tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
  429 
  430         if (unlikely(h->mid == SDP_MID_DATA &&
  431             mb->m_pkthdr.len == SDP_HEAD_SIZE)) {
  432                 /* Credit update is valid even after RCV_SHUTDOWN */
  433                 m_freem(mb);
  434                 return 0;
  435         }
  436 
  437         if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) ||
  438             TCPS_HAVERCVDFIN(ssk->state)) {
  439                 sdp_prf(sk, NULL, "Control mb - queing to control queue");
  440 #ifdef SDP_ZCOPY
  441                 if (h->mid == SDP_MID_SRCAVAIL_CANCEL) {
  442                         sdp_dbg_data(sk, "Got SrcAvailCancel. "
  443                                         "seq: 0x%d seq_ack: 0x%d\n",
  444                                         ntohl(h->mseq), ntohl(h->mseq_ack));
  445                         ssk->srcavail_cancel_mseq = ntohl(h->mseq);
  446                 }
  447 
  448 
  449                 if (h->mid == SDP_MID_RDMARDCOMPL) {
  450                         struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1);
  451                         sdp_dbg_data(sk, "RdmaRdCompl message arrived\n");
  452                         sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack),
  453                                         ntohl(rrch->len));
  454                 }
  455 #endif
  456                 if (mbufq_enqueue(&ssk->rxctlq, mb) != 0)
  457                         m_freem(mb);
  458                 return (0);
  459         }
  460 
  461         sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid));
  462         mb = sdp_sock_queue_rcv_mb(sk, mb);
  463 
  464 
  465         return 0;
  466 }
  467 
  468 /* called only from irq */
  469 static struct mbuf *
  470 sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
  471 {
  472         struct mbuf *mb;
  473         struct sdp_bsdh *h;
  474         struct socket *sk = ssk->socket;
  475         int mseq;
  476 
  477         mb = sdp_recv_completion(ssk, wc->wr_id);
  478         if (unlikely(!mb))
  479                 return NULL;
  480 
  481         if (unlikely(wc->status)) {
  482                 if (ssk->qp_active && sk) {
  483                         sdp_dbg(sk, "Recv completion with error. "
  484                             "Status %s (%d), vendor: %d\n",
  485                             ib_wc_status_msg(wc->status), wc->status,
  486                             wc->vendor_err);
  487                         sdp_abort(sk);
  488                         ssk->qp_active = 0;
  489                 }
  490                 m_freem(mb);
  491                 return NULL;
  492         }
  493 
  494         sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
  495                         (int)wc->wr_id, wc->byte_len);
  496         if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
  497                 sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
  498                                 wc->byte_len, sizeof(struct sdp_bsdh));
  499                 m_freem(mb);
  500                 return NULL;
  501         }
  502         /* Use m_adj to trim the tail of data we didn't use. */
  503         m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len));
  504         h = mtod(mb, struct sdp_bsdh *);
  505 
  506         SDP_DUMP_PACKET(ssk->socket, "RX", mb, h);
  507 
  508         ssk->rx_packets++;
  509         ssk->rx_bytes += mb->m_pkthdr.len;
  510 
  511         mseq = ntohl(h->mseq);
  512         atomic_set(&ssk->mseq_ack, mseq);
  513         if (mseq != (int)wc->wr_id)
  514                 sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
  515                                 mseq, (int)wc->wr_id);
  516 
  517         return mb;
  518 }
  519 
  520 /* Wakeup writers if we now have credits. */
  521 static void
  522 sdp_bzcopy_write_space(struct sdp_sock *ssk)
  523 {
  524         struct socket *sk = ssk->socket;
  525 
  526         if (tx_credits(ssk) >= ssk->min_bufs && sk)
  527                 sowwakeup(sk);
  528 }
  529 
  530 /* only from interrupt. */
  531 static int
  532 sdp_poll_rx_cq(struct sdp_sock *ssk)
  533 {
  534         struct ib_cq *cq = ssk->rx_ring.cq;
  535         struct ib_wc ibwc[SDP_NUM_WC];
  536         int n, i;
  537         int wc_processed = 0;
  538         struct mbuf *mb;
  539 
  540         do {
  541                 n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
  542                 for (i = 0; i < n; ++i) {
  543                         struct ib_wc *wc = &ibwc[i];
  544 
  545                         BUG_ON(!(wc->wr_id & SDP_OP_RECV));
  546                         mb = sdp_process_rx_wc(ssk, wc);
  547                         if (!mb)
  548                                 continue;
  549 
  550                         sdp_process_rx_mb(ssk, mb);
  551                         wc_processed++;
  552                 }
  553         } while (n == SDP_NUM_WC);
  554 
  555         if (wc_processed)
  556                 sdp_bzcopy_write_space(ssk);
  557 
  558         return wc_processed;
  559 }
  560 
  561 static void
  562 sdp_rx_comp_work(struct work_struct *work)
  563 {
  564         struct sdp_sock *ssk = container_of(work, struct sdp_sock,
  565                         rx_comp_work);
  566 
  567         sdp_prf(ssk->socket, NULL, "%s", __func__);
  568 
  569         SDP_WLOCK(ssk);
  570         if (unlikely(!ssk->qp)) {
  571                 sdp_prf(ssk->socket, NULL, "qp was destroyed");
  572                 goto out;
  573         }
  574         if (unlikely(!ssk->rx_ring.cq)) {
  575                 sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL");
  576                 goto out;
  577         }
  578 
  579         if (unlikely(!ssk->poll_cq)) {
  580                 struct rdma_cm_id *id = ssk->id;
  581                 if (id && id->qp)
  582                         rdma_notify(id, IB_EVENT_COMM_EST);
  583                 goto out;
  584         }
  585 
  586         sdp_do_posts(ssk);
  587 out:
  588         SDP_WUNLOCK(ssk);
  589 }
  590 
  591 void
  592 sdp_do_posts(struct sdp_sock *ssk)
  593 {
  594         struct socket *sk = ssk->socket;
  595         int xmit_poll_force;
  596         struct mbuf *mb;
  597 
  598         SDP_WLOCK_ASSERT(ssk);
  599         if (!ssk->qp_active) {
  600                 sdp_dbg(sk, "QP is deactivated\n");
  601                 return;
  602         }
  603 
  604         while ((mb = mbufq_dequeue(&ssk->rxctlq)) != NULL)
  605                 sdp_process_rx_ctl_mb(ssk, mb);
  606 
  607         if (ssk->state == TCPS_TIME_WAIT)
  608                 return;
  609 
  610         if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
  611                 return;
  612 
  613         sdp_post_recvs(ssk);
  614 
  615         if (tx_ring_posted(ssk))
  616                 sdp_xmit_poll(ssk, 1);
  617 
  618         sdp_post_sends(ssk, M_NOWAIT);
  619 
  620         xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS;
  621 
  622         if (credit_update_needed(ssk) || xmit_poll_force) {
  623                 /* if has pending tx because run out of tx_credits - xmit it */
  624                 sdp_prf(sk, NULL, "Processing to free pending sends");
  625                 sdp_xmit_poll(ssk,  xmit_poll_force);
  626                 sdp_prf(sk, NULL, "Sending credit update");
  627                 sdp_post_sends(ssk, M_NOWAIT);
  628         }
  629 
  630 }
  631 
  632 int
  633 sdp_process_rx(struct sdp_sock *ssk)
  634 {
  635         int wc_processed = 0;
  636         int credits_before;
  637 
  638         if (!rx_ring_trylock(&ssk->rx_ring)) {
  639                 sdp_dbg(ssk->socket, "ring destroyed. not polling it\n");
  640                 return 0;
  641         }
  642 
  643         credits_before = tx_credits(ssk);
  644 
  645         wc_processed = sdp_poll_rx_cq(ssk);
  646         sdp_prf(ssk->socket, NULL, "processed %d", wc_processed);
  647 
  648         if (wc_processed) {
  649                 sdp_prf(ssk->socket, NULL, "credits:  %d -> %d",
  650                                 credits_before, tx_credits(ssk));
  651                 queue_work(rx_comp_wq, &ssk->rx_comp_work);
  652         }
  653         sdp_arm_rx_cq(ssk);
  654 
  655         rx_ring_unlock(&ssk->rx_ring);
  656 
  657         return (wc_processed);
  658 }
  659 
  660 static void
  661 sdp_rx_irq(struct ib_cq *cq, void *cq_context)
  662 {
  663         struct sdp_sock *ssk;
  664 
  665         ssk = cq_context;
  666         KASSERT(cq == ssk->rx_ring.cq,
  667             ("%s: mismatched cq on %p", __func__, ssk));
  668 
  669         SDPSTATS_COUNTER_INC(rx_int_count);
  670 
  671         sdp_prf(sk, NULL, "rx irq");
  672 
  673         sdp_process_rx(ssk);
  674 }
  675 
  676 static
  677 void sdp_rx_ring_purge(struct sdp_sock *ssk)
  678 {
  679         while (rx_ring_posted(ssk) > 0) {
  680                 struct mbuf *mb;
  681                 mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
  682                 if (!mb)
  683                         break;
  684                 m_freem(mb);
  685         }
  686 }
  687 
  688 void
  689 sdp_rx_ring_init(struct sdp_sock *ssk)
  690 {
  691         ssk->rx_ring.buffer = NULL;
  692         ssk->rx_ring.destroyed = 0;
  693         rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock");
  694 }
  695 
  696 static void
  697 sdp_rx_cq_event_handler(struct ib_event *event, void *data)
  698 {
  699 }
  700 
  701 int
  702 sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
  703 {
  704         struct ib_cq_init_attr rx_cq_attr = {
  705                 .cqe = SDP_RX_SIZE,
  706                 .comp_vector = 0,
  707                 .flags = 0,
  708         };
  709         struct ib_cq *rx_cq;
  710         int rc = 0;
  711 
  712         sdp_dbg(ssk->socket, "rx ring created");
  713         INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
  714         atomic_set(&ssk->rx_ring.head, 1);
  715         atomic_set(&ssk->rx_ring.tail, 1);
  716 
  717         ssk->rx_ring.buffer = malloc(sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE,
  718             M_SDP, M_WAITOK);
  719 
  720         rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
  721             ssk, &rx_cq_attr);
  722         if (IS_ERR(rx_cq)) {
  723                 rc = PTR_ERR(rx_cq);
  724                 sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc);
  725                 goto err_cq;
  726         }
  727 
  728         sdp_sk(ssk->socket)->rx_ring.cq = rx_cq;
  729         sdp_arm_rx_cq(ssk);
  730 
  731         return 0;
  732 
  733 err_cq:
  734         free(ssk->rx_ring.buffer, M_SDP);
  735         ssk->rx_ring.buffer = NULL;
  736         return rc;
  737 }
  738 
  739 void
  740 sdp_rx_ring_destroy(struct sdp_sock *ssk)
  741 {
  742 
  743         cancel_work_sync(&ssk->rx_comp_work);
  744         rx_ring_destroy_lock(&ssk->rx_ring);
  745 
  746         if (ssk->rx_ring.buffer) {
  747                 sdp_rx_ring_purge(ssk);
  748                 free(ssk->rx_ring.buffer, M_SDP);
  749                 ssk->rx_ring.buffer = NULL;
  750         }
  751 
  752         if (ssk->rx_ring.cq) {
  753                 ib_destroy_cq(ssk->rx_ring.cq);
  754                 ssk->rx_ring.cq = NULL;
  755         }
  756 
  757         WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
  758 }

Cache object: 725fbd12620577e8c55e1f846c46016a


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.