Statistics
| Branch: | Tag: | Revision:

root / host / lib / transport / usb_zero_copy_wrapper.cpp @ 64e3cd12

History | View | Annotate | Download (8.06 KB)

1
//
2
// Copyright 2011-2012 Ettus Research LLC
3
//
4
// This program is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8
//
9
// This program is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13
//
14
// You should have received a copy of the GNU General Public License
15
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
//
17

    
18
#include "simple_claimer.hpp"
19
#include <uhd/transport/usb_zero_copy.hpp>
20
#include <uhd/transport/buffer_pool.hpp>
21
#include <uhd/utils/byteswap.hpp>
22
#include <uhd/utils/msg.hpp>
23
#include <uhd/utils/tasks.hpp>
24
#include <boost/foreach.hpp>
25
#include <boost/make_shared.hpp>
26
#include <boost/thread/mutex.hpp>
27
#include <boost/thread/condition_variable.hpp>
28
#include <boost/bind.hpp>
29
#include <vector>
30
#include <iostream>
31

    
32
using namespace uhd::transport;
33

    
34
static const boost::posix_time::time_duration AUTOFLUSH_TIMEOUT(boost::posix_time::milliseconds(1));
35

    
36
/***********************************************************************
37
 * USB zero copy wrapper - managed receive buffer
38
 **********************************************************************/
39
class usb_zero_copy_wrapper_mrb : public managed_recv_buffer{
40
public:
41
    usb_zero_copy_wrapper_mrb(void){/*NOP*/}
42

    
43
    void release(void){
44
        _mrb.reset(); //decrement ref count, other MRB's may hold a ref
45
        _claimer.release();
46
    }
47

    
48
    UHD_INLINE sptr get_new(
49
        managed_recv_buffer::sptr &mrb, size_t &offset_bytes,
50
        const double timeout, size_t &index
51
    ){
52
        if (not mrb or not _claimer.claim_with_wait(timeout)) return sptr();
53

    
54
        index++; //advances the caller's buffer
55

    
56
        //hold a copy of the buffer shared pointer
57
        _mrb = mrb;
58

    
59
        //extract this packet's memory address and length in bytes
60
        char *mem = mrb->cast<char *>() + offset_bytes;
61
        const boost::uint32_t *mem32 = reinterpret_cast<const boost::uint32_t *>(mem);
62
        const size_t words32 = (uhd::wtohx(mem32[0]) & 0xffff); //length in words32 (from VRT header)
63
        const size_t len = words32*sizeof(boost::uint32_t); //length in bytes
64

    
65
        //check if this receive buffer has been exhausted
66
        offset_bytes += len;
67
        if (offset_bytes >= mrb->size()) mrb.reset(); //drop caller's ref
68
        else if (uhd::wtohx(mem32[words32]) == 0) mrb.reset();
69

    
70
        return make(this, mem, len);
71
    }
72

    
73
private:
74
    managed_recv_buffer::sptr _mrb;
75
    simple_claimer _claimer;
76
};
77

    
78
/***********************************************************************
79
 * USB zero copy wrapper - managed send buffer
80
 **********************************************************************/
81
class usb_zero_copy_wrapper_msb : public managed_send_buffer{
82
public:
83
    usb_zero_copy_wrapper_msb(const usb_zero_copy::sptr internal, const size_t fragmentation_size):
84
        _internal(internal), _fragmentation_size(fragmentation_size)
85
    {
86
        _ok_to_auto_flush = false;
87
        _task = uhd::task::make(boost::bind(&usb_zero_copy_wrapper_msb::auto_flush, this));
88
    }
89

    
90
    ~usb_zero_copy_wrapper_msb(void)
91
    {
92
        //ensure the task has exited before anything auto deconstructs
93
        _task.reset();
94
    }
95

    
96
    void release(void){
97
        boost::mutex::scoped_lock lock(_mutex);
98
        _ok_to_auto_flush = true;
99

    
100
        //get a reference to the VITA header before incrementing
101
        const boost::uint32_t vita_header = reinterpret_cast<const boost::uint32_t *>(_mem_buffer_tip)[0];
102

    
103
        _bytes_in_buffer += size();
104
        _mem_buffer_tip += size();
105

    
106
        //extract VITA end of packet flag, we must force flush under eof conditions
107
        const bool eop = (uhd::wtohx(vita_header) & (0x1 << 24)) != 0;
108
        const bool full = _bytes_in_buffer >= (_last_send_buff->size() - _fragmentation_size);
109
        if (eop or full){
110
            _last_send_buff->commit(_bytes_in_buffer);
111
            _last_send_buff.reset();
112

    
113
            //notify the auto-flusher to restart its timed_wait
114
            lock.unlock(); _cond.notify_one();
115
        }
116
    }
117

    
118
    UHD_INLINE sptr get_new(const double timeout){
119
        boost::mutex::scoped_lock lock(_mutex);
120
        _ok_to_auto_flush = false;
121

    
122
        if (not _last_send_buff){
123
            _last_send_buff = _internal->get_send_buff(timeout);
124
            if (not _last_send_buff) return sptr();
125
            _mem_buffer_tip = _last_send_buff->cast<char *>();
126
            _bytes_in_buffer = 0;
127
        }
128

    
129
        return make(this, _mem_buffer_tip, _fragmentation_size);
130
    }
131

    
132
private:
133
    usb_zero_copy::sptr _internal;
134
    const size_t _fragmentation_size;
135
    managed_send_buffer::sptr _last_send_buff;
136
    size_t _bytes_in_buffer;
137
    char *_mem_buffer_tip;
138

    
139
    //private variables for auto flusher
140
    boost::mutex _mutex;
141
    boost::condition_variable _cond;
142
    uhd::task::sptr _task;
143
    bool _ok_to_auto_flush;
144

    
145
    /*!
146
     * The auto flusher ensures that buffers are force committed when
147
     * the user has not called get_new() within a certain time window.
148
     */
149
    void auto_flush(void)
150
    {
151
        boost::mutex::scoped_lock lock(_mutex);
152
        const bool timeout = not _cond.timed_wait(lock, AUTOFLUSH_TIMEOUT);
153
        if (timeout and _ok_to_auto_flush and _last_send_buff and _bytes_in_buffer != 0)
154
        {
155
            _last_send_buff->commit(_bytes_in_buffer);
156
            _last_send_buff.reset();
157
        }
158
    }
159
};
160

    
161
/***********************************************************************
162
 * USB zero copy wrapper implementation
163
 **********************************************************************/
164
class usb_zero_copy_wrapper : public usb_zero_copy{
165
public:
166
    usb_zero_copy_wrapper(sptr usb_zc, const size_t frame_boundary):
167
        _internal_zc(usb_zc),
168
        _frame_boundary(frame_boundary),
169
        _next_recv_buff_index(0)
170
    {
171
        for (size_t i = 0; i < this->get_num_recv_frames(); i++){
172
            _mrb_pool.push_back(boost::make_shared<usb_zero_copy_wrapper_mrb>());
173
        }
174
        _the_only_msb = boost::make_shared<usb_zero_copy_wrapper_msb>(usb_zc, frame_boundary);
175
    }
176

    
177
    managed_recv_buffer::sptr get_recv_buff(double timeout){
178
        //attempt to get a managed recv buffer
179
        if (not _last_recv_buff){
180
            _last_recv_buff = _internal_zc->get_recv_buff(timeout);
181
            _last_recv_offset = 0; //reset offset into buffer
182
        }
183

    
184
        //get the buffer to be returned to the user
185
        if (_next_recv_buff_index == _mrb_pool.size()) _next_recv_buff_index = 0;
186
        return _mrb_pool[_next_recv_buff_index]->get_new(
187
            _last_recv_buff, _last_recv_offset, timeout, _next_recv_buff_index
188
        );
189
    }
190

    
191
    size_t get_num_recv_frames(void) const{
192
        return _internal_zc->get_num_recv_frames();
193
    }
194

    
195
    size_t get_recv_frame_size(void) const{
196
        return std::min(_frame_boundary, _internal_zc->get_recv_frame_size());
197
    }
198

    
199
    managed_send_buffer::sptr get_send_buff(double timeout){
200
        return _the_only_msb->get_new(timeout);
201
    }
202

    
203
    size_t get_num_send_frames(void) const{
204
        return _internal_zc->get_num_send_frames();
205
    }
206

    
207
    size_t get_send_frame_size(void) const{
208
        return std::min(_frame_boundary, _internal_zc->get_send_frame_size());
209
    }
210

    
211
private:
212
    sptr _internal_zc;
213
    size_t _frame_boundary;
214
    std::vector<boost::shared_ptr<usb_zero_copy_wrapper_mrb> > _mrb_pool;
215
    boost::shared_ptr<usb_zero_copy_wrapper_msb> _the_only_msb;
216

    
217
    //state for last recv buffer to create multiple managed buffers
218
    managed_recv_buffer::sptr _last_recv_buff;
219
    size_t _last_recv_offset;
220
    size_t _next_recv_buff_index;
221
};
222

    
223
/***********************************************************************
224
 * USB zero copy wrapper factory function
225
 **********************************************************************/
226
usb_zero_copy::sptr usb_zero_copy::make_wrapper(
227
    sptr usb_zc, size_t usb_frame_boundary
228
){
229
    return sptr(new usb_zero_copy_wrapper(usb_zc, usb_frame_boundary));
230
}