805 lines
28 KiB
C++
805 lines
28 KiB
C++
// parallel.cpp
|
|
/*
|
|
* Copyright 2010 10gen Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
|
|
#include "pch.h"
|
|
#include "parallel.h"
|
|
#include "connpool.h"
|
|
#include "../db/queryutil.h"
|
|
#include "../db/dbmessage.h"
|
|
#include "../s/util.h"
|
|
#include "../s/shard.h"
|
|
|
|
namespace mongo {
|
|
|
|
// -------- ClusteredCursor -----------
|
|
|
|
ClusteredCursor::ClusteredCursor( QueryMessage& q ) {
|
|
_ns = q.ns;
|
|
_query = q.query.copy();
|
|
_options = q.queryOptions;
|
|
_fields = q.fields.copy();
|
|
_batchSize = q.ntoreturn;
|
|
if ( _batchSize == 1 )
|
|
_batchSize = 2;
|
|
|
|
_done = false;
|
|
_didInit = false;
|
|
}
|
|
|
|
ClusteredCursor::ClusteredCursor( const string& ns , const BSONObj& q , int options , const BSONObj& fields ) {
|
|
_ns = ns;
|
|
_query = q.getOwned();
|
|
_options = options;
|
|
_fields = fields.getOwned();
|
|
_batchSize = 0;
|
|
|
|
_done = false;
|
|
_didInit = false;
|
|
}
|
|
|
|
ClusteredCursor::~ClusteredCursor() {
|
|
_done = true; // just in case
|
|
}
|
|
|
|
void ClusteredCursor::init() {
|
|
if ( _didInit )
|
|
return;
|
|
_didInit = true;
|
|
_init();
|
|
}
|
|
|
|
void ClusteredCursor::_checkCursor( DBClientCursor * cursor ) {
|
|
assert( cursor );
|
|
|
|
if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) {
|
|
throw StaleConfigException( _ns , "ClusteredCursor::_checkCursor" );
|
|
}
|
|
|
|
if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) {
|
|
BSONObj o = cursor->next();
|
|
throw UserException( o["code"].numberInt() , o["$err"].String() );
|
|
}
|
|
}
|
|
|
|
auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft , bool lazy ) {
|
|
uassert( 10017 , "cursor already done" , ! _done );
|
|
assert( _didInit );
|
|
|
|
BSONObj q = _query;
|
|
if ( ! extra.isEmpty() ) {
|
|
q = concatQuery( q , extra );
|
|
}
|
|
|
|
try {
|
|
ShardConnection conn( server , _ns );
|
|
|
|
if ( conn.setVersion() ) {
|
|
conn.done();
|
|
throw StaleConfigException( _ns , "ClusteredCursor::query" , true );
|
|
}
|
|
|
|
LOG(5) << "ClusteredCursor::query (" << type() << ") server:" << server
|
|
<< " ns:" << _ns << " query:" << q << " num:" << num
|
|
<< " _fields:" << _fields << " options: " << _options << endl;
|
|
|
|
auto_ptr<DBClientCursor> cursor =
|
|
conn->query( _ns , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options , _batchSize == 0 ? 0 : _batchSize + skipLeft );
|
|
|
|
if ( ! cursor.get() && _options & QueryOption_PartialResults ) {
|
|
_done = true;
|
|
conn.done();
|
|
return cursor;
|
|
}
|
|
|
|
massert( 13633 , str::stream() << "error querying server: " << server , cursor.get() );
|
|
|
|
cursor->attach( &conn ); // this calls done on conn
|
|
assert( ! conn.ok() );
|
|
_checkCursor( cursor.get() );
|
|
return cursor;
|
|
}
|
|
catch ( SocketException& e ) {
|
|
if ( ! ( _options & QueryOption_PartialResults ) )
|
|
throw e;
|
|
_done = true;
|
|
return auto_ptr<DBClientCursor>();
|
|
}
|
|
}
|
|
|
|
BSONObj ClusteredCursor::explain( const string& server , BSONObj extra ) {
|
|
BSONObj q = _query;
|
|
if ( ! extra.isEmpty() ) {
|
|
q = concatQuery( q , extra );
|
|
}
|
|
|
|
BSONObj o;
|
|
|
|
ShardConnection conn( server , _ns );
|
|
auto_ptr<DBClientCursor> cursor = conn->query( _ns , Query( q ).explain() , abs( _batchSize ) * -1 , 0 , _fields.isEmpty() ? 0 : &_fields );
|
|
if ( cursor.get() && cursor->more() )
|
|
o = cursor->next().getOwned();
|
|
conn.done();
|
|
return o;
|
|
}
|
|
|
|
BSONObj ClusteredCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ) {
|
|
if ( ! query.hasField( "query" ) )
|
|
return _concatFilter( query , extraFilter );
|
|
|
|
BSONObjBuilder b;
|
|
BSONObjIterator i( query );
|
|
while ( i.more() ) {
|
|
BSONElement e = i.next();
|
|
|
|
if ( strcmp( e.fieldName() , "query" ) ) {
|
|
b.append( e );
|
|
continue;
|
|
}
|
|
|
|
b.append( "query" , _concatFilter( e.embeddedObjectUserCheck() , extraFilter ) );
|
|
}
|
|
return b.obj();
|
|
}
|
|
|
|
BSONObj ClusteredCursor::_concatFilter( const BSONObj& filter , const BSONObj& extra ) {
|
|
BSONObjBuilder b;
|
|
b.appendElements( filter );
|
|
b.appendElements( extra );
|
|
return b.obj();
|
|
// TODO: should do some simplification here if possibl ideally
|
|
}
|
|
|
|
BSONObj ClusteredCursor::explain() {
|
|
// Note: by default we filter out allPlans and oldPlan in the shell's
|
|
// explain() function. If you add any recursive structures, make sure to
|
|
// edit the JS to make sure everything gets filtered.
|
|
|
|
BSONObjBuilder b;
|
|
b.append( "clusteredType" , type() );
|
|
|
|
long long millis = 0;
|
|
double numExplains = 0;
|
|
|
|
map<string,long long> counters;
|
|
|
|
map<string,list<BSONObj> > out;
|
|
{
|
|
_explain( out );
|
|
|
|
BSONObjBuilder x( b.subobjStart( "shards" ) );
|
|
for ( map<string,list<BSONObj> >::iterator i=out.begin(); i!=out.end(); ++i ) {
|
|
string shard = i->first;
|
|
list<BSONObj> l = i->second;
|
|
BSONArrayBuilder y( x.subarrayStart( shard ) );
|
|
for ( list<BSONObj>::iterator j=l.begin(); j!=l.end(); ++j ) {
|
|
BSONObj temp = *j;
|
|
y.append( temp );
|
|
|
|
BSONObjIterator k( temp );
|
|
while ( k.more() ) {
|
|
BSONElement z = k.next();
|
|
if ( z.fieldName()[0] != 'n' )
|
|
continue;
|
|
long long& c = counters[z.fieldName()];
|
|
c += z.numberLong();
|
|
}
|
|
|
|
millis += temp["millis"].numberLong();
|
|
numExplains++;
|
|
}
|
|
y.done();
|
|
}
|
|
x.done();
|
|
}
|
|
|
|
for ( map<string,long long>::iterator i=counters.begin(); i!=counters.end(); ++i )
|
|
b.appendNumber( i->first , i->second );
|
|
|
|
b.appendNumber( "millisTotal" , millis );
|
|
b.append( "millisAvg" , (int)((double)millis / numExplains ) );
|
|
b.append( "numQueries" , (int)numExplains );
|
|
b.append( "numShards" , (int)out.size() );
|
|
|
|
return b.obj();
|
|
}
|
|
|
|
// -------- FilteringClientCursor -----------
|
|
FilteringClientCursor::FilteringClientCursor( const BSONObj filter )
|
|
: _matcher( filter ) , _done( true ) {
|
|
}
|
|
|
|
FilteringClientCursor::FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter )
|
|
: _matcher( filter ) , _cursor( cursor ) , _done( cursor.get() == 0 ) {
|
|
}
|
|
|
|
FilteringClientCursor::FilteringClientCursor( DBClientCursor* cursor , const BSONObj filter )
|
|
: _matcher( filter ) , _cursor( cursor ) , _done( cursor == 0 ) {
|
|
}
|
|
|
|
|
|
FilteringClientCursor::~FilteringClientCursor() {
|
|
}
|
|
|
|
void FilteringClientCursor::reset( auto_ptr<DBClientCursor> cursor ) {
|
|
_cursor = cursor;
|
|
_next = BSONObj();
|
|
_done = _cursor.get() == 0;
|
|
}
|
|
|
|
void FilteringClientCursor::reset( DBClientCursor* cursor ) {
|
|
_cursor.reset( cursor );
|
|
_next = BSONObj();
|
|
_done = cursor == 0;
|
|
}
|
|
|
|
|
|
bool FilteringClientCursor::more() {
|
|
if ( ! _next.isEmpty() )
|
|
return true;
|
|
|
|
if ( _done )
|
|
return false;
|
|
|
|
_advance();
|
|
return ! _next.isEmpty();
|
|
}
|
|
|
|
BSONObj FilteringClientCursor::next() {
|
|
assert( ! _next.isEmpty() );
|
|
assert( ! _done );
|
|
|
|
BSONObj ret = _next;
|
|
_next = BSONObj();
|
|
_advance();
|
|
return ret;
|
|
}
|
|
|
|
BSONObj FilteringClientCursor::peek() {
|
|
if ( _next.isEmpty() )
|
|
_advance();
|
|
return _next;
|
|
}
|
|
|
|
void FilteringClientCursor::_advance() {
|
|
assert( _next.isEmpty() );
|
|
if ( ! _cursor.get() || _done )
|
|
return;
|
|
|
|
while ( _cursor->more() ) {
|
|
_next = _cursor->next();
|
|
if ( _matcher.matches( _next ) ) {
|
|
if ( ! _cursor->moreInCurrentBatch() )
|
|
_next = _next.getOwned();
|
|
return;
|
|
}
|
|
_next = BSONObj();
|
|
}
|
|
_done = true;
|
|
}
|
|
|
|
// -------- SerialServerClusteredCursor -----------
|
|
|
|
SerialServerClusteredCursor::SerialServerClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , int sortOrder) : ClusteredCursor( q ) {
|
|
for ( set<ServerAndQuery>::const_iterator i = servers.begin(); i!=servers.end(); i++ )
|
|
_servers.push_back( *i );
|
|
|
|
if ( sortOrder > 0 )
|
|
sort( _servers.begin() , _servers.end() );
|
|
else if ( sortOrder < 0 )
|
|
sort( _servers.rbegin() , _servers.rend() );
|
|
|
|
_serverIndex = 0;
|
|
|
|
_needToSkip = q.ntoskip;
|
|
}
|
|
|
|
bool SerialServerClusteredCursor::more() {
|
|
|
|
// TODO: optimize this by sending on first query and then back counting
|
|
// tricky in case where 1st server doesn't have any after
|
|
// need it to send n skipped
|
|
while ( _needToSkip > 0 && _current.more() ) {
|
|
_current.next();
|
|
_needToSkip--;
|
|
}
|
|
|
|
if ( _current.more() )
|
|
return true;
|
|
|
|
if ( _serverIndex >= _servers.size() ) {
|
|
return false;
|
|
}
|
|
|
|
ServerAndQuery& sq = _servers[_serverIndex++];
|
|
|
|
_current.reset( query( sq._server , 0 , sq._extra ) );
|
|
return more();
|
|
}
|
|
|
|
BSONObj SerialServerClusteredCursor::next() {
|
|
uassert( 10018 , "no more items" , more() );
|
|
return _current.next();
|
|
}
|
|
|
|
void SerialServerClusteredCursor::_explain( map< string,list<BSONObj> >& out ) {
|
|
for ( unsigned i=0; i<_servers.size(); i++ ) {
|
|
ServerAndQuery& sq = _servers[i];
|
|
list<BSONObj> & l = out[sq._server];
|
|
l.push_back( explain( sq._server , sq._extra ) );
|
|
}
|
|
}
|
|
|
|
// -------- ParallelSortClusteredCursor -----------
|
|
|
|
ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q ,
|
|
const BSONObj& sortKey )
|
|
: ClusteredCursor( q ) , _servers( servers ) {
|
|
_sortKey = sortKey.getOwned();
|
|
_needToSkip = q.ntoskip;
|
|
_finishCons();
|
|
}
|
|
|
|
ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns ,
|
|
const Query& q ,
|
|
int options , const BSONObj& fields )
|
|
: ClusteredCursor( ns , q.obj , options , fields ) , _servers( servers ) {
|
|
_sortKey = q.getSort().copy();
|
|
_needToSkip = 0;
|
|
_finishCons();
|
|
}
|
|
|
|
void ParallelSortClusteredCursor::_finishCons() {
|
|
_numServers = _servers.size();
|
|
_lastFrom = 0;
|
|
_cursors = 0;
|
|
|
|
if ( ! _sortKey.isEmpty() && ! _fields.isEmpty() ) {
|
|
// we need to make sure the sort key is in the projection
|
|
|
|
set<string> sortKeyFields;
|
|
_sortKey.getFieldNames(sortKeyFields);
|
|
|
|
BSONObjBuilder b;
|
|
bool isNegative = false;
|
|
{
|
|
BSONObjIterator i( _fields );
|
|
while ( i.more() ) {
|
|
BSONElement e = i.next();
|
|
b.append( e );
|
|
|
|
string fieldName = e.fieldName();
|
|
|
|
// exact field
|
|
bool found = sortKeyFields.erase(fieldName);
|
|
|
|
// subfields
|
|
set<string>::const_iterator begin = sortKeyFields.lower_bound(fieldName + ".\x00");
|
|
set<string>::const_iterator end = sortKeyFields.lower_bound(fieldName + ".\xFF");
|
|
sortKeyFields.erase(begin, end);
|
|
|
|
if ( ! e.trueValue() ) {
|
|
uassert( 13431 , "have to have sort key in projection and removing it" , !found && begin == end );
|
|
}
|
|
else if (!e.isABSONObj()) {
|
|
isNegative = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (isNegative) {
|
|
for (set<string>::const_iterator it(sortKeyFields.begin()), end(sortKeyFields.end()); it != end; ++it) {
|
|
b.append(*it, 1);
|
|
}
|
|
}
|
|
|
|
_fields = b.obj();
|
|
}
|
|
}
|
|
|
|
// TODO: Merge with futures API? We do a lot of error checking here that would be useful elsewhere.
|
|
void ParallelSortClusteredCursor::_init() {
|
|
|
|
// log() << "Starting parallel search..." << endl;
|
|
|
|
// make sure we're not already initialized
|
|
assert( ! _cursors );
|
|
_cursors = new FilteringClientCursor[_numServers];
|
|
|
|
bool returnPartial = ( _options & QueryOption_PartialResults );
|
|
|
|
vector<ServerAndQuery> queries( _servers.begin(), _servers.end() );
|
|
set<int> retryQueries;
|
|
int finishedQueries = 0;
|
|
|
|
vector< shared_ptr<ShardConnection> > conns;
|
|
vector<string> servers;
|
|
|
|
// Since we may get all sorts of errors, record them all as they come and throw them later if necessary
|
|
vector<string> staleConfigExs;
|
|
vector<string> socketExs;
|
|
vector<string> otherExs;
|
|
bool allConfigStale = false;
|
|
|
|
int retries = -1;
|
|
|
|
// Loop through all the queries until we've finished or gotten a socket exception on all of them
|
|
// We break early for non-socket exceptions, and socket exceptions if we aren't returning partial results
|
|
do {
|
|
retries++;
|
|
|
|
bool firstPass = retryQueries.size() == 0;
|
|
|
|
if( ! firstPass ){
|
|
log() << "retrying " << ( returnPartial ? "(partial) " : "" ) << "parallel connection to ";
|
|
for( set<int>::iterator it = retryQueries.begin(); it != retryQueries.end(); ++it ){
|
|
log() << queries[*it]._server << ", ";
|
|
}
|
|
log() << finishedQueries << " finished queries." << endl;
|
|
}
|
|
|
|
size_t num = 0;
|
|
for ( vector<ServerAndQuery>::iterator it = queries.begin(); it != queries.end(); ++it ) {
|
|
size_t i = num++;
|
|
|
|
const ServerAndQuery& sq = *it;
|
|
|
|
// If we're not retrying this cursor on later passes, continue
|
|
if( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) continue;
|
|
|
|
// log() << "Querying " << _query << " from " << _ns << " for " << sq._server << endl;
|
|
|
|
BSONObj q = _query;
|
|
if ( ! sq._extra.isEmpty() ) {
|
|
q = concatQuery( q , sq._extra );
|
|
}
|
|
|
|
string errLoc = " @ " + sq._server;
|
|
|
|
if( firstPass ){
|
|
|
|
// This may be the first time connecting to this shard, if so we can get an error here
|
|
try {
|
|
conns.push_back( shared_ptr<ShardConnection>( new ShardConnection( sq._server , _ns ) ) );
|
|
}
|
|
catch( std::exception& e ){
|
|
socketExs.push_back( e.what() + errLoc );
|
|
if( ! returnPartial ){
|
|
num--;
|
|
break;
|
|
}
|
|
conns.push_back( shared_ptr<ShardConnection>() );
|
|
continue;
|
|
}
|
|
|
|
servers.push_back( sq._server );
|
|
}
|
|
|
|
if ( conns[i]->setVersion() ) {
|
|
conns[i]->done();
|
|
staleConfigExs.push_back( (string)"stale config detected for " + StaleConfigException( _ns , "ParallelCursor::_init" , true ).what() + errLoc );
|
|
break;
|
|
}
|
|
|
|
LOG(5) << "ParallelSortClusteredCursor::init server:" << sq._server << " ns:" << _ns
|
|
<< " query:" << q << " _fields:" << _fields << " options: " << _options << endl;
|
|
|
|
if( ! _cursors[i].raw() )
|
|
_cursors[i].reset( new DBClientCursor( conns[i]->get() , _ns , q ,
|
|
0 , // nToReturn
|
|
0 , // nToSkip
|
|
_fields.isEmpty() ? 0 : &_fields , // fieldsToReturn
|
|
_options ,
|
|
// NtoReturn is weird.
|
|
// If zero, it means use default size, so we do that for all cursors
|
|
// If positive, it's the batch size (we don't want this cursor limiting results), tha
|
|
// done at a higher level
|
|
// If negative, it's the batch size, but we don't create a cursor - so we don't want
|
|
// to create a child cursor either.
|
|
// Either way, if non-zero, we want to pull back the batch size + the skip amount as
|
|
// quickly as possible. Potentially, for a cursor on a single shard or if we keep be
|
|
// chunks, we can actually add the skip value into the cursor and/or make some assump
|
|
// return value size ( (batch size + skip amount) / num_servers ).
|
|
_batchSize == 0 ? 0 :
|
|
( _batchSize > 0 ? _batchSize + _needToSkip :
|
|
_batchSize - _needToSkip ) // batchSize
|
|
) );
|
|
|
|
try{
|
|
_cursors[i].raw()->initLazy( ! firstPass );
|
|
}
|
|
catch( SocketException& e ){
|
|
socketExs.push_back( e.what() + errLoc );
|
|
_cursors[i].reset( NULL );
|
|
conns[i]->done();
|
|
if( ! returnPartial ) break;
|
|
}
|
|
catch( std::exception& e){
|
|
otherExs.push_back( e.what() + errLoc );
|
|
_cursors[i].reset( NULL );
|
|
conns[i]->done();
|
|
break;
|
|
}
|
|
|
|
}
|
|
|
|
// Go through all the potentially started cursors and finish initializing them or log any errors and
|
|
// potentially retry
|
|
// TODO: Better error classification would make this easier, errors are indicated in all sorts of ways
|
|
// here that we need to trap.
|
|
for ( size_t i = 0; i < num; i++ ) {
|
|
|
|
// log() << "Finishing query for " << cons[i].get()->getHost() << endl;
|
|
string errLoc = " @ " + queries[i]._server;
|
|
|
|
if( ! _cursors[i].raw() || ( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) ){
|
|
if( conns[i] ) conns[i].get()->done();
|
|
continue;
|
|
}
|
|
|
|
assert( conns[i] );
|
|
retryQueries.erase( i );
|
|
|
|
bool retry = false;
|
|
|
|
try {
|
|
|
|
if( ! _cursors[i].raw()->initLazyFinish( retry ) ) {
|
|
|
|
warning() << "invalid result from " << conns[i]->getHost() << ( retry ? ", retrying" : "" ) << endl;
|
|
_cursors[i].reset( NULL );
|
|
|
|
if( ! retry ){
|
|
socketExs.push_back( str::stream() << "error querying server: " << servers[i] );
|
|
conns[i]->done();
|
|
}
|
|
else {
|
|
retryQueries.insert( i );
|
|
}
|
|
|
|
continue;
|
|
}
|
|
}
|
|
catch ( MsgAssertionException& e ){
|
|
socketExs.push_back( e.what() + errLoc );
|
|
_cursors[i].reset( NULL );
|
|
conns[i]->done();
|
|
continue;
|
|
}
|
|
catch ( SocketException& e ) {
|
|
socketExs.push_back( e.what() + errLoc );
|
|
_cursors[i].reset( NULL );
|
|
conns[i]->done();
|
|
continue;
|
|
}
|
|
catch( std::exception& e ){
|
|
otherExs.push_back( e.what() + errLoc );
|
|
_cursors[i].reset( NULL );
|
|
conns[i]->done();
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
_cursors[i].raw()->attach( conns[i].get() ); // this calls done on conn
|
|
_checkCursor( _cursors[i].raw() );
|
|
|
|
finishedQueries++;
|
|
}
|
|
catch ( StaleConfigException& e ){
|
|
|
|
// Our stored configuration data is actually stale, we need to reload it
|
|
// when we throw our exception
|
|
allConfigStale = true;
|
|
|
|
staleConfigExs.push_back( (string)"stale config detected for " + e.what() + errLoc );
|
|
_cursors[i].reset( NULL );
|
|
conns[i]->done();
|
|
continue;
|
|
}
|
|
catch( std::exception& e ){
|
|
otherExs.push_back( e.what() + errLoc );
|
|
_cursors[i].reset( NULL );
|
|
conns[i]->done();
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Don't exceed our max retries, should not happen
|
|
assert( retries < 5 );
|
|
}
|
|
while( retryQueries.size() > 0 /* something to retry */ &&
|
|
( socketExs.size() == 0 || returnPartial ) /* no conn issues */ &&
|
|
staleConfigExs.size() == 0 /* no config issues */ &&
|
|
otherExs.size() == 0 /* no other issues */);
|
|
|
|
// Assert that our conns are all closed!
|
|
for( vector< shared_ptr<ShardConnection> >::iterator i = conns.begin(); i < conns.end(); ++i ){
|
|
assert( ! (*i) || ! (*i)->ok() );
|
|
}
|
|
|
|
// Handle errors we got during initialization.
|
|
// If we're returning partial results, we can ignore socketExs, but nothing else
|
|
// Log a warning in any case, so we don't lose these messages
|
|
bool throwException = ( socketExs.size() > 0 && ! returnPartial ) || staleConfigExs.size() > 0 || otherExs.size() > 0;
|
|
|
|
if( socketExs.size() > 0 || staleConfigExs.size() > 0 || otherExs.size() > 0 ) {
|
|
|
|
vector<string> errMsgs;
|
|
|
|
errMsgs.insert( errMsgs.end(), staleConfigExs.begin(), staleConfigExs.end() );
|
|
errMsgs.insert( errMsgs.end(), otherExs.begin(), otherExs.end() );
|
|
errMsgs.insert( errMsgs.end(), socketExs.begin(), socketExs.end() );
|
|
|
|
stringstream errMsg;
|
|
errMsg << "could not initialize cursor across all shards because : ";
|
|
for( vector<string>::iterator i = errMsgs.begin(); i != errMsgs.end(); i++ ){
|
|
if( i != errMsgs.begin() ) errMsg << " :: and :: ";
|
|
errMsg << *i;
|
|
}
|
|
|
|
if( throwException && staleConfigExs.size() > 0 )
|
|
throw StaleConfigException( _ns , errMsg.str() , ! allConfigStale );
|
|
else if( throwException )
|
|
throw DBException( errMsg.str(), 14827 );
|
|
else
|
|
warning() << errMsg.str() << endl;
|
|
}
|
|
|
|
if( retries > 0 )
|
|
log() << "successfully finished parallel query after " << retries << " retries" << endl;
|
|
|
|
}
|
|
|
|
ParallelSortClusteredCursor::~ParallelSortClusteredCursor() {
|
|
delete [] _cursors;
|
|
_cursors = 0;
|
|
}
|
|
|
|
bool ParallelSortClusteredCursor::more() {
|
|
|
|
if ( _needToSkip > 0 ) {
|
|
int n = _needToSkip;
|
|
_needToSkip = 0;
|
|
|
|
while ( n > 0 && more() ) {
|
|
BSONObj x = next();
|
|
n--;
|
|
}
|
|
|
|
_needToSkip = n;
|
|
}
|
|
|
|
for ( int i=0; i<_numServers; i++ ) {
|
|
if ( _cursors[i].more() )
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
BSONObj ParallelSortClusteredCursor::next() {
|
|
BSONObj best = BSONObj();
|
|
int bestFrom = -1;
|
|
|
|
for( int j = 0; j < _numServers; j++ ){
|
|
|
|
// Iterate _numServers times, starting one past the last server we used.
|
|
// This means we actually start at server #1, not #0, but shouldn't matter
|
|
|
|
int i = ( j + _lastFrom + 1 ) % _numServers;
|
|
|
|
if ( ! _cursors[i].more() )
|
|
continue;
|
|
|
|
BSONObj me = _cursors[i].peek();
|
|
|
|
if ( best.isEmpty() ) {
|
|
best = me;
|
|
bestFrom = i;
|
|
if( _sortKey.isEmpty() ) break;
|
|
continue;
|
|
}
|
|
|
|
int comp = best.woSortOrder( me , _sortKey , true );
|
|
if ( comp < 0 )
|
|
continue;
|
|
|
|
best = me;
|
|
bestFrom = i;
|
|
}
|
|
|
|
_lastFrom = bestFrom;
|
|
|
|
uassert( 10019 , "no more elements" , ! best.isEmpty() );
|
|
_cursors[bestFrom].next();
|
|
|
|
return best;
|
|
}
|
|
|
|
void ParallelSortClusteredCursor::_explain( map< string,list<BSONObj> >& out ) {
|
|
for ( set<ServerAndQuery>::iterator i=_servers.begin(); i!=_servers.end(); ++i ) {
|
|
const ServerAndQuery& sq = *i;
|
|
list<BSONObj> & l = out[sq._server];
|
|
l.push_back( explain( sq._server , sq._extra ) );
|
|
}
|
|
|
|
}
|
|
|
|
// -----------------
|
|
// ---- Future -----
|
|
// -----------------
|
|
|
|
Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn )
|
|
:_server(server) ,_db(db) , _options(options), _cmd(cmd) ,_conn(conn) ,_done(false)
|
|
{
|
|
try {
|
|
if ( ! _conn ){
|
|
_connHolder.reset( new ScopedDbConnection( _server ) );
|
|
_conn = _connHolder->get();
|
|
}
|
|
|
|
if ( _conn->lazySupported() ) {
|
|
_cursor.reset( new DBClientCursor(_conn, _db + ".$cmd", _cmd, -1/*limit*/, 0, NULL, _options, 0));
|
|
_cursor->initLazy();
|
|
}
|
|
else {
|
|
_done = true; // we set _done first because even if there is an error we're done
|
|
_ok = _conn->runCommand( db , cmd , _res , options );
|
|
}
|
|
}
|
|
catch ( std::exception& e ) {
|
|
error() << "Future::spawnComand (part 1) exception: " << e.what() << endl;
|
|
_ok = false;
|
|
_done = true;
|
|
}
|
|
}
|
|
|
|
bool Future::CommandResult::join() {
|
|
if (_done)
|
|
return _ok;
|
|
|
|
try {
|
|
// TODO: Allow retries?
|
|
bool retry = false;
|
|
bool finished = _cursor->initLazyFinish( retry );
|
|
|
|
// Shouldn't need to communicate with server any more
|
|
if ( _connHolder )
|
|
_connHolder->done();
|
|
|
|
uassert(14812, str::stream() << "Error running command on server: " << _server, finished);
|
|
massert(14813, "Command returned nothing", _cursor->more());
|
|
|
|
_res = _cursor->nextSafe();
|
|
_ok = _res["ok"].trueValue();
|
|
|
|
}
|
|
catch ( std::exception& e ) {
|
|
error() << "Future::spawnComand (part 2) exception: " << e.what() << endl;
|
|
_ok = false;
|
|
}
|
|
|
|
_done = true;
|
|
return _ok;
|
|
}
|
|
|
|
shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn ) {
|
|
shared_ptr<Future::CommandResult> res (new Future::CommandResult( server , db , cmd , options , conn ));
|
|
return res;
|
|
}
|
|
|
|
}
|