The Design and Implementation of the FreeBSD Operating System, Second Edition
Now available: The Design and Implementation of the FreeBSD Operating System (Second Edition)


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]

FreeBSD/Linux Kernel Cross Reference
sys/ofed/drivers/infiniband/ulp/sdp/sdp_tx.c

Version: -  FREEBSD  -  FREEBSD-13-STABLE  -  FREEBSD-13-0  -  FREEBSD-12-STABLE  -  FREEBSD-12-0  -  FREEBSD-11-STABLE  -  FREEBSD-11-0  -  FREEBSD-10-STABLE  -  FREEBSD-10-0  -  FREEBSD-9-STABLE  -  FREEBSD-9-0  -  FREEBSD-8-STABLE  -  FREEBSD-8-0  -  FREEBSD-7-STABLE  -  FREEBSD-7-0  -  FREEBSD-6-STABLE  -  FREEBSD-6-0  -  FREEBSD-5-STABLE  -  FREEBSD-5-0  -  FREEBSD-4-STABLE  -  FREEBSD-3-STABLE  -  FREEBSD22  -  l41  -  OPENBSD  -  linux-2.6  -  MK84  -  PLAN9  -  xnu-8792 
SearchContext: -  none  -  3  -  10 

    1 /*-
    2  * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
    3  *
    4  * Copyright (c) 2009 Mellanox Technologies Ltd.  All rights reserved.
    5  *
    6  * This software is available to you under a choice of one of two
    7  * licenses.  You may choose to be licensed under the terms of the GNU
    8  * General Public License (GPL) Version 2, available from the file
    9  * COPYING in the main directory of this source tree, or the
   10  * OpenIB.org BSD license below:
   11  *
   12  *     Redistribution and use in source and binary forms, with or
   13  *     without modification, are permitted provided that the following
   14  *     conditions are met:
   15  *
   16  *      - Redistributions of source code must retain the above
   17  *        copyright notice, this list of conditions and the following
   18  *        disclaimer.
   19  *
   20  *      - Redistributions in binary form must reproduce the above
   21  *        copyright notice, this list of conditions and the following
   22  *        disclaimer in the documentation and/or other materials
   23  *        provided with the distribution.
   24  *
   25  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
   26  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
   27  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
   28  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
   29  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
   30  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
   31  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
   32  * SOFTWARE.
   33  */
   34 #include "sdp.h"
   35 
   36 #define sdp_cnt(var) do { (var)++; } while (0)
   37 
   38 SDP_MODPARAM_SINT(sdp_keepalive_probes_sent, 0,
   39                 "Total number of keepalive probes sent.");
   40 
   41 static int sdp_process_tx_cq(struct sdp_sock *ssk);
   42 static void sdp_poll_tx_timeout(void *data);
   43 
   44 int
   45 sdp_xmit_poll(struct sdp_sock *ssk, int force)
   46 {
   47         int wc_processed = 0;
   48 
   49         SDP_WLOCK_ASSERT(ssk);
   50         sdp_prf(ssk->socket, NULL, "%s", __func__);
   51 
   52         /* If we don't have a pending timer, set one up to catch our recent
   53            post in case the interface becomes idle */
   54         if (!callout_pending(&ssk->tx_ring.timer))
   55                 callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
   56                     sdp_poll_tx_timeout, ssk);
   57 
   58         /* Poll the CQ every SDP_TX_POLL_MODER packets */
   59         if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
   60                 wc_processed = sdp_process_tx_cq(ssk);
   61 
   62         return wc_processed;
   63 }
   64 
   65 void
   66 sdp_post_send(struct sdp_sock *ssk, struct mbuf *mb)
   67 {
   68         struct sdp_buf *tx_req;
   69         struct sdp_bsdh *h;
   70         unsigned long mseq;
   71         struct ib_device *dev;
   72         const struct ib_send_wr *bad_wr;
   73         struct ib_sge ibsge[SDP_MAX_SEND_SGES];
   74         struct ib_sge *sge;
   75         struct ib_send_wr tx_wr = { NULL };
   76         int i, rc;
   77         u64 addr;
   78 
   79         SDPSTATS_COUNTER_MID_INC(post_send, h->mid);
   80         SDPSTATS_HIST(send_size, mb->len);
   81 
   82         if (!ssk->qp_active) {
   83                 m_freem(mb);
   84                 return;
   85         }
   86 
   87         mseq = ring_head(ssk->tx_ring);
   88         h = mtod(mb, struct sdp_bsdh *);
   89         ssk->tx_packets++;
   90         ssk->tx_bytes += mb->m_pkthdr.len;
   91 
   92 #ifdef SDP_ZCOPY
   93         if (unlikely(h->mid == SDP_MID_SRCAVAIL)) {
   94                 struct tx_srcavail_state *tx_sa = TX_SRCAVAIL_STATE(mb);
   95                 if (ssk->tx_sa != tx_sa) {
   96                         sdp_dbg_data(ssk->socket, "SrcAvail cancelled "
   97                                         "before being sent!\n");
   98                         WARN_ON(1);
   99                         m_freem(mb);
  100                         return;
  101                 }
  102                 TX_SRCAVAIL_STATE(mb)->mseq = mseq;
  103         }
  104 #endif
  105 
  106         if (unlikely(mb->m_flags & M_URG))
  107                 h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
  108         else
  109                 h->flags = 0;
  110 
  111         mb->m_flags |= M_RDONLY; /* Don't allow compression once sent. */
  112         h->bufs = htons(rx_ring_posted(ssk));
  113         h->len = htonl(mb->m_pkthdr.len);
  114         h->mseq = htonl(mseq);
  115         h->mseq_ack = htonl(mseq_ack(ssk));
  116 
  117         sdp_prf1(ssk->socket, mb, "TX: %s bufs: %d mseq:%ld ack:%d",
  118                         mid2str(h->mid), rx_ring_posted(ssk), mseq,
  119                         ntohl(h->mseq_ack));
  120 
  121         SDP_DUMP_PACKET(ssk->socket, "TX", mb, h);
  122 
  123         tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)];
  124         tx_req->mb = mb;
  125         dev = ssk->ib_device;
  126         sge = &ibsge[0];
  127         for (i = 0;  mb != NULL; i++, mb = mb->m_next, sge++) {
  128                 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
  129                     DMA_TO_DEVICE);
  130                 /* TODO: proper error handling */
  131                 BUG_ON(ib_dma_mapping_error(dev, addr));
  132                 BUG_ON(i >= SDP_MAX_SEND_SGES);
  133                 tx_req->mapping[i] = addr;
  134                 sge->addr = addr;
  135                 sge->length = mb->m_len;
  136                 sge->lkey = ssk->sdp_dev->pd->local_dma_lkey;
  137         }
  138         tx_wr.next = NULL;
  139         tx_wr.wr_id = mseq | SDP_OP_SEND;
  140         tx_wr.sg_list = ibsge;
  141         tx_wr.num_sge = i;
  142         tx_wr.opcode = IB_WR_SEND;
  143         tx_wr.send_flags = IB_SEND_SIGNALED;
  144         if (unlikely(tx_req->mb->m_flags & M_URG))
  145                 tx_wr.send_flags |= IB_SEND_SOLICITED;
  146 
  147         rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr);
  148         if (unlikely(rc)) {
  149                 sdp_dbg(ssk->socket,
  150                                 "ib_post_send failed with status %d.\n", rc);
  151 
  152                 sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
  153 
  154                 sdp_notify(ssk, ECONNRESET);
  155                 m_freem(tx_req->mb);
  156                 return;
  157         }
  158 
  159         atomic_inc(&ssk->tx_ring.head);
  160         atomic_dec(&ssk->tx_ring.credits);
  161         atomic_set(&ssk->remote_credits, rx_ring_posted(ssk));
  162 
  163         return;
  164 }
  165 
  166 static struct mbuf *
  167 sdp_send_completion(struct sdp_sock *ssk, int mseq)
  168 {
  169         struct ib_device *dev;
  170         struct sdp_buf *tx_req;
  171         struct mbuf *mb = NULL;
  172         struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
  173 
  174         if (unlikely(mseq != ring_tail(*tx_ring))) {
  175                 printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
  176                         mseq, ring_tail(*tx_ring));
  177                 goto out;
  178         }
  179 
  180         dev = ssk->ib_device;
  181         tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)];
  182         mb = tx_req->mb;
  183         sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
  184 
  185 #ifdef SDP_ZCOPY
  186         /* TODO: AIO and real zcopy code; add their context support here */
  187         if (BZCOPY_STATE(mb))
  188                 BZCOPY_STATE(mb)->busy--;
  189 #endif
  190 
  191         atomic_inc(&tx_ring->tail);
  192 
  193 out:
  194         return mb;
  195 }
  196 
  197 static int
  198 sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
  199 {
  200         struct mbuf *mb = NULL;
  201         struct sdp_bsdh *h;
  202 
  203         if (unlikely(wc->status)) {
  204                 if (wc->status != IB_WC_WR_FLUSH_ERR) {
  205                         sdp_prf(ssk->socket, mb, "Send completion with error. "
  206                                 "Status %d", wc->status);
  207                         sdp_dbg_data(ssk->socket, "Send completion with error. "
  208                                 "Status %d\n", wc->status);
  209                         sdp_notify(ssk, ECONNRESET);
  210                 }
  211         }
  212 
  213         mb = sdp_send_completion(ssk, wc->wr_id);
  214         if (unlikely(!mb))
  215                 return -1;
  216 
  217         h = mtod(mb, struct sdp_bsdh *);
  218         sdp_prf1(ssk->socket, mb, "tx completion. mseq:%d", ntohl(h->mseq));
  219         sdp_dbg(ssk->socket, "tx completion. %p %d mseq:%d",
  220             mb, mb->m_pkthdr.len, ntohl(h->mseq));
  221         m_freem(mb);
  222 
  223         return 0;
  224 }
  225 
  226 static inline void
  227 sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
  228 {
  229 
  230         if (likely(wc->wr_id & SDP_OP_SEND)) {
  231                 sdp_handle_send_comp(ssk, wc);
  232                 return;
  233         }
  234 
  235 #ifdef SDP_ZCOPY
  236         if (wc->wr_id & SDP_OP_RDMA) {
  237                 /* TODO: handle failed RDMA read cqe */
  238 
  239                 sdp_dbg_data(ssk->socket,
  240                     "TX comp: RDMA read. status: %d\n", wc->status);
  241                 sdp_prf1(sk, NULL, "TX comp: RDMA read");
  242 
  243                 if (!ssk->tx_ring.rdma_inflight) {
  244                         sdp_warn(ssk->socket, "ERROR: unexpected RDMA read\n");
  245                         return;
  246                 }
  247 
  248                 if (!ssk->tx_ring.rdma_inflight->busy) {
  249                         sdp_warn(ssk->socket,
  250                             "ERROR: too many RDMA read completions\n");
  251                         return;
  252                 }
  253 
  254                 /* Only last RDMA read WR is signalled. Order is guaranteed -
  255                  * therefore if Last RDMA read WR is completed - all other
  256                  * have, too */
  257                 ssk->tx_ring.rdma_inflight->busy = 0;
  258                 sowwakeup(ssk->socket);
  259                 sdp_dbg_data(ssk->socket, "woke up sleepers\n");
  260                 return;
  261         }
  262 #endif
  263 
  264         /* Keepalive probe sent cleanup */
  265         sdp_cnt(sdp_keepalive_probes_sent);
  266 
  267         if (likely(!wc->status))
  268                 return;
  269 
  270         sdp_dbg(ssk->socket, " %s consumes KEEPALIVE status %d\n",
  271                         __func__, wc->status);
  272 
  273         if (wc->status == IB_WC_WR_FLUSH_ERR)
  274                 return;
  275 
  276         sdp_notify(ssk, ECONNRESET);
  277 }
  278 
  279 static int
  280 sdp_process_tx_cq(struct sdp_sock *ssk)
  281 {
  282         struct ib_wc ibwc[SDP_NUM_WC];
  283         int n, i;
  284         int wc_processed = 0;
  285 
  286         SDP_WLOCK_ASSERT(ssk);
  287 
  288         if (!ssk->tx_ring.cq) {
  289                 sdp_dbg(ssk->socket, "tx irq on destroyed tx_cq\n");
  290                 return 0;
  291         }
  292 
  293         do {
  294                 n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc);
  295                 for (i = 0; i < n; ++i) {
  296                         sdp_process_tx_wc(ssk, ibwc + i);
  297                         wc_processed++;
  298                 }
  299         } while (n == SDP_NUM_WC);
  300 
  301         if (wc_processed) {
  302                 sdp_post_sends(ssk, M_NOWAIT);
  303                 sdp_prf1(sk, NULL, "Waking sendmsg. inflight=%d", 
  304                                 (u32) tx_ring_posted(ssk));
  305                 sowwakeup(ssk->socket);
  306         }
  307 
  308         return wc_processed;
  309 }
  310 
  311 static void
  312 sdp_poll_tx(struct sdp_sock *ssk)
  313 {
  314         struct socket *sk = ssk->socket;
  315         u32 inflight, wc_processed;
  316 
  317         sdp_prf1(ssk->socket, NULL, "TX timeout: inflight=%d, head=%d tail=%d", 
  318                 (u32) tx_ring_posted(ssk),
  319                 ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
  320 
  321         if (unlikely(ssk->state == TCPS_CLOSED)) {
  322                 sdp_warn(sk, "Socket is closed\n");
  323                 goto out;
  324         }
  325 
  326         wc_processed = sdp_process_tx_cq(ssk);
  327         if (!wc_processed)
  328                 SDPSTATS_COUNTER_INC(tx_poll_miss);
  329         else
  330                 SDPSTATS_COUNTER_INC(tx_poll_hit);
  331 
  332         inflight = (u32) tx_ring_posted(ssk);
  333         sdp_prf1(ssk->socket, NULL, "finished tx processing. inflight = %d",
  334             inflight);
  335 
  336         /* If there are still packets in flight and the timer has not already
  337          * been scheduled by the Tx routine then schedule it here to guarantee
  338          * completion processing of these packets */
  339         if (inflight)
  340                 callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
  341                     sdp_poll_tx_timeout, ssk);
  342 out:
  343 #ifdef SDP_ZCOPY
  344         if (ssk->tx_ring.rdma_inflight && ssk->tx_ring.rdma_inflight->busy) {
  345                 sdp_prf1(sk, NULL, "RDMA is inflight - arming irq");
  346                 sdp_arm_tx_cq(ssk);
  347         }
  348 #endif
  349         return;
  350 }
  351 
  352 static void
  353 sdp_poll_tx_timeout(void *data)
  354 {
  355         struct sdp_sock *ssk = (struct sdp_sock *)data;
  356 
  357         if (!callout_active(&ssk->tx_ring.timer))
  358                 return;
  359         callout_deactivate(&ssk->tx_ring.timer);
  360         sdp_poll_tx(ssk);
  361 }
  362 
  363 static void
  364 sdp_tx_irq(struct ib_cq *cq, void *cq_context)
  365 {
  366         struct sdp_sock *ssk;
  367 
  368         ssk = cq_context;
  369         sdp_prf1(ssk->socket, NULL, "tx irq");
  370         sdp_dbg_data(ssk->socket, "Got tx comp interrupt\n");
  371         SDPSTATS_COUNTER_INC(tx_int_count);
  372         SDP_WLOCK(ssk);
  373         sdp_poll_tx(ssk);
  374         SDP_WUNLOCK(ssk);
  375 }
  376 
  377 static
  378 void sdp_tx_ring_purge(struct sdp_sock *ssk)
  379 {
  380         while (tx_ring_posted(ssk)) {
  381                 struct mbuf *mb;
  382                 mb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring));
  383                 if (!mb)
  384                         break;
  385                 m_freem(mb);
  386         }
  387 }
  388 
  389 void
  390 sdp_post_keepalive(struct sdp_sock *ssk)
  391 {
  392         int rc;
  393         struct ib_send_wr wr;
  394         const struct ib_send_wr *bad_wr;
  395 
  396         sdp_dbg(ssk->socket, "%s\n", __func__);
  397 
  398         memset(&wr, 0, sizeof(wr));
  399 
  400         wr.next    = NULL;
  401         wr.wr_id   = 0;
  402         wr.sg_list = NULL;
  403         wr.num_sge = 0;
  404         wr.opcode  = IB_WR_RDMA_WRITE;
  405 
  406         rc = ib_post_send(ssk->qp, &wr, &bad_wr);
  407         if (rc) {
  408                 sdp_dbg(ssk->socket,
  409                         "ib_post_keepalive failed with status %d.\n", rc);
  410                 sdp_notify(ssk, ECONNRESET);
  411         }
  412 
  413         sdp_cnt(sdp_keepalive_probes_sent);
  414 }
  415 
  416 static void
  417 sdp_tx_cq_event_handler(struct ib_event *event, void *data)
  418 {
  419 }
  420 
  421 int
  422 sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
  423 {
  424         struct ib_cq_init_attr tx_cq_attr = {
  425                 .cqe = SDP_TX_SIZE,
  426                 .comp_vector = 0,
  427                 .flags = 0,
  428         };
  429         struct ib_cq *tx_cq;
  430         int rc = 0;
  431 
  432         sdp_dbg(ssk->socket, "tx ring create\n");
  433         callout_init_rw(&ssk->tx_ring.timer, &ssk->lock, 0);
  434         callout_init_rw(&ssk->nagle_timer, &ssk->lock, 0);
  435         atomic_set(&ssk->tx_ring.head, 1);
  436         atomic_set(&ssk->tx_ring.tail, 1);
  437 
  438         ssk->tx_ring.buffer = malloc(sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE,
  439             M_SDP, M_WAITOK);
  440 
  441         tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler,
  442                           ssk, &tx_cq_attr);
  443         if (IS_ERR(tx_cq)) {
  444                 rc = PTR_ERR(tx_cq);
  445                 sdp_warn(ssk->socket, "Unable to allocate TX CQ: %d.\n", rc);
  446                 goto err_cq;
  447         }
  448         ssk->tx_ring.cq = tx_cq;
  449         ssk->tx_ring.poll_cnt = 0;
  450         sdp_arm_tx_cq(ssk);
  451 
  452         return 0;
  453 
  454 err_cq:
  455         free(ssk->tx_ring.buffer, M_SDP);
  456         ssk->tx_ring.buffer = NULL;
  457         return rc;
  458 }
  459 
  460 void
  461 sdp_tx_ring_destroy(struct sdp_sock *ssk)
  462 {
  463 
  464         sdp_dbg(ssk->socket, "tx ring destroy\n");
  465         SDP_WLOCK(ssk);
  466         callout_stop(&ssk->tx_ring.timer);
  467         callout_stop(&ssk->nagle_timer);
  468         SDP_WUNLOCK(ssk);
  469         callout_drain(&ssk->tx_ring.timer);
  470         callout_drain(&ssk->nagle_timer);
  471 
  472         if (ssk->tx_ring.buffer) {
  473                 sdp_tx_ring_purge(ssk);
  474                 free(ssk->tx_ring.buffer, M_SDP);
  475                 ssk->tx_ring.buffer = NULL;
  476         }
  477 
  478         if (ssk->tx_ring.cq) {
  479                 ib_destroy_cq(ssk->tx_ring.cq);
  480                 ssk->tx_ring.cq = NULL;
  481         }
  482 
  483         WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring));
  484 }

Cache object: 21f7b080347c0157969a396d3948bd19


[ source navigation ] [ diff markup ] [ identifier search ] [ freetext search ] [ file search ] [ list types ] [ track identifier ]


This page is part of the FreeBSD/Linux Linux Kernel Cross-Reference, and was automatically generated using a modified version of the LXR engine.