mongo/client/parallel.cpp
2012-06-27 15:36:51 -04:00

805 lines
28 KiB
C++

// parallel.cpp
/*
* Copyright 2010 10gen Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "pch.h"
#include "parallel.h"
#include "connpool.h"
#include "../db/queryutil.h"
#include "../db/dbmessage.h"
#include "../s/util.h"
#include "../s/shard.h"
namespace mongo {
// -------- ClusteredCursor -----------
ClusteredCursor::ClusteredCursor( QueryMessage& q ) {
_ns = q.ns;
_query = q.query.copy();
_options = q.queryOptions;
_fields = q.fields.copy();
_batchSize = q.ntoreturn;
if ( _batchSize == 1 )
_batchSize = 2;
_done = false;
_didInit = false;
}
ClusteredCursor::ClusteredCursor( const string& ns , const BSONObj& q , int options , const BSONObj& fields ) {
_ns = ns;
_query = q.getOwned();
_options = options;
_fields = fields.getOwned();
_batchSize = 0;
_done = false;
_didInit = false;
}
ClusteredCursor::~ClusteredCursor() {
_done = true; // just in case
}
void ClusteredCursor::init() {
if ( _didInit )
return;
_didInit = true;
_init();
}
void ClusteredCursor::_checkCursor( DBClientCursor * cursor ) {
assert( cursor );
if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) {
throw StaleConfigException( _ns , "ClusteredCursor::_checkCursor" );
}
if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) {
BSONObj o = cursor->next();
throw UserException( o["code"].numberInt() , o["$err"].String() );
}
}
auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft , bool lazy ) {
uassert( 10017 , "cursor already done" , ! _done );
assert( _didInit );
BSONObj q = _query;
if ( ! extra.isEmpty() ) {
q = concatQuery( q , extra );
}
try {
ShardConnection conn( server , _ns );
if ( conn.setVersion() ) {
conn.done();
throw StaleConfigException( _ns , "ClusteredCursor::query" , true );
}
LOG(5) << "ClusteredCursor::query (" << type() << ") server:" << server
<< " ns:" << _ns << " query:" << q << " num:" << num
<< " _fields:" << _fields << " options: " << _options << endl;
auto_ptr<DBClientCursor> cursor =
conn->query( _ns , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options , _batchSize == 0 ? 0 : _batchSize + skipLeft );
if ( ! cursor.get() && _options & QueryOption_PartialResults ) {
_done = true;
conn.done();
return cursor;
}
massert( 13633 , str::stream() << "error querying server: " << server , cursor.get() );
cursor->attach( &conn ); // this calls done on conn
assert( ! conn.ok() );
_checkCursor( cursor.get() );
return cursor;
}
catch ( SocketException& e ) {
if ( ! ( _options & QueryOption_PartialResults ) )
throw e;
_done = true;
return auto_ptr<DBClientCursor>();
}
}
BSONObj ClusteredCursor::explain( const string& server , BSONObj extra ) {
BSONObj q = _query;
if ( ! extra.isEmpty() ) {
q = concatQuery( q , extra );
}
BSONObj o;
ShardConnection conn( server , _ns );
auto_ptr<DBClientCursor> cursor = conn->query( _ns , Query( q ).explain() , abs( _batchSize ) * -1 , 0 , _fields.isEmpty() ? 0 : &_fields );
if ( cursor.get() && cursor->more() )
o = cursor->next().getOwned();
conn.done();
return o;
}
BSONObj ClusteredCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ) {
if ( ! query.hasField( "query" ) )
return _concatFilter( query , extraFilter );
BSONObjBuilder b;
BSONObjIterator i( query );
while ( i.more() ) {
BSONElement e = i.next();
if ( strcmp( e.fieldName() , "query" ) ) {
b.append( e );
continue;
}
b.append( "query" , _concatFilter( e.embeddedObjectUserCheck() , extraFilter ) );
}
return b.obj();
}
BSONObj ClusteredCursor::_concatFilter( const BSONObj& filter , const BSONObj& extra ) {
BSONObjBuilder b;
b.appendElements( filter );
b.appendElements( extra );
return b.obj();
// TODO: should do some simplification here if possibl ideally
}
BSONObj ClusteredCursor::explain() {
// Note: by default we filter out allPlans and oldPlan in the shell's
// explain() function. If you add any recursive structures, make sure to
// edit the JS to make sure everything gets filtered.
BSONObjBuilder b;
b.append( "clusteredType" , type() );
long long millis = 0;
double numExplains = 0;
map<string,long long> counters;
map<string,list<BSONObj> > out;
{
_explain( out );
BSONObjBuilder x( b.subobjStart( "shards" ) );
for ( map<string,list<BSONObj> >::iterator i=out.begin(); i!=out.end(); ++i ) {
string shard = i->first;
list<BSONObj> l = i->second;
BSONArrayBuilder y( x.subarrayStart( shard ) );
for ( list<BSONObj>::iterator j=l.begin(); j!=l.end(); ++j ) {
BSONObj temp = *j;
y.append( temp );
BSONObjIterator k( temp );
while ( k.more() ) {
BSONElement z = k.next();
if ( z.fieldName()[0] != 'n' )
continue;
long long& c = counters[z.fieldName()];
c += z.numberLong();
}
millis += temp["millis"].numberLong();
numExplains++;
}
y.done();
}
x.done();
}
for ( map<string,long long>::iterator i=counters.begin(); i!=counters.end(); ++i )
b.appendNumber( i->first , i->second );
b.appendNumber( "millisTotal" , millis );
b.append( "millisAvg" , (int)((double)millis / numExplains ) );
b.append( "numQueries" , (int)numExplains );
b.append( "numShards" , (int)out.size() );
return b.obj();
}
// -------- FilteringClientCursor -----------
FilteringClientCursor::FilteringClientCursor( const BSONObj filter )
: _matcher( filter ) , _done( true ) {
}
FilteringClientCursor::FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter )
: _matcher( filter ) , _cursor( cursor ) , _done( cursor.get() == 0 ) {
}
FilteringClientCursor::FilteringClientCursor( DBClientCursor* cursor , const BSONObj filter )
: _matcher( filter ) , _cursor( cursor ) , _done( cursor == 0 ) {
}
FilteringClientCursor::~FilteringClientCursor() {
}
void FilteringClientCursor::reset( auto_ptr<DBClientCursor> cursor ) {
_cursor = cursor;
_next = BSONObj();
_done = _cursor.get() == 0;
}
void FilteringClientCursor::reset( DBClientCursor* cursor ) {
_cursor.reset( cursor );
_next = BSONObj();
_done = cursor == 0;
}
bool FilteringClientCursor::more() {
if ( ! _next.isEmpty() )
return true;
if ( _done )
return false;
_advance();
return ! _next.isEmpty();
}
BSONObj FilteringClientCursor::next() {
assert( ! _next.isEmpty() );
assert( ! _done );
BSONObj ret = _next;
_next = BSONObj();
_advance();
return ret;
}
BSONObj FilteringClientCursor::peek() {
if ( _next.isEmpty() )
_advance();
return _next;
}
void FilteringClientCursor::_advance() {
assert( _next.isEmpty() );
if ( ! _cursor.get() || _done )
return;
while ( _cursor->more() ) {
_next = _cursor->next();
if ( _matcher.matches( _next ) ) {
if ( ! _cursor->moreInCurrentBatch() )
_next = _next.getOwned();
return;
}
_next = BSONObj();
}
_done = true;
}
// -------- SerialServerClusteredCursor -----------
SerialServerClusteredCursor::SerialServerClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , int sortOrder) : ClusteredCursor( q ) {
for ( set<ServerAndQuery>::const_iterator i = servers.begin(); i!=servers.end(); i++ )
_servers.push_back( *i );
if ( sortOrder > 0 )
sort( _servers.begin() , _servers.end() );
else if ( sortOrder < 0 )
sort( _servers.rbegin() , _servers.rend() );
_serverIndex = 0;
_needToSkip = q.ntoskip;
}
bool SerialServerClusteredCursor::more() {
// TODO: optimize this by sending on first query and then back counting
// tricky in case where 1st server doesn't have any after
// need it to send n skipped
while ( _needToSkip > 0 && _current.more() ) {
_current.next();
_needToSkip--;
}
if ( _current.more() )
return true;
if ( _serverIndex >= _servers.size() ) {
return false;
}
ServerAndQuery& sq = _servers[_serverIndex++];
_current.reset( query( sq._server , 0 , sq._extra ) );
return more();
}
BSONObj SerialServerClusteredCursor::next() {
uassert( 10018 , "no more items" , more() );
return _current.next();
}
void SerialServerClusteredCursor::_explain( map< string,list<BSONObj> >& out ) {
for ( unsigned i=0; i<_servers.size(); i++ ) {
ServerAndQuery& sq = _servers[i];
list<BSONObj> & l = out[sq._server];
l.push_back( explain( sq._server , sq._extra ) );
}
}
// -------- ParallelSortClusteredCursor -----------
ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q ,
const BSONObj& sortKey )
: ClusteredCursor( q ) , _servers( servers ) {
_sortKey = sortKey.getOwned();
_needToSkip = q.ntoskip;
_finishCons();
}
ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns ,
const Query& q ,
int options , const BSONObj& fields )
: ClusteredCursor( ns , q.obj , options , fields ) , _servers( servers ) {
_sortKey = q.getSort().copy();
_needToSkip = 0;
_finishCons();
}
void ParallelSortClusteredCursor::_finishCons() {
_numServers = _servers.size();
_lastFrom = 0;
_cursors = 0;
if ( ! _sortKey.isEmpty() && ! _fields.isEmpty() ) {
// we need to make sure the sort key is in the projection
set<string> sortKeyFields;
_sortKey.getFieldNames(sortKeyFields);
BSONObjBuilder b;
bool isNegative = false;
{
BSONObjIterator i( _fields );
while ( i.more() ) {
BSONElement e = i.next();
b.append( e );
string fieldName = e.fieldName();
// exact field
bool found = sortKeyFields.erase(fieldName);
// subfields
set<string>::const_iterator begin = sortKeyFields.lower_bound(fieldName + ".\x00");
set<string>::const_iterator end = sortKeyFields.lower_bound(fieldName + ".\xFF");
sortKeyFields.erase(begin, end);
if ( ! e.trueValue() ) {
uassert( 13431 , "have to have sort key in projection and removing it" , !found && begin == end );
}
else if (!e.isABSONObj()) {
isNegative = true;
}
}
}
if (isNegative) {
for (set<string>::const_iterator it(sortKeyFields.begin()), end(sortKeyFields.end()); it != end; ++it) {
b.append(*it, 1);
}
}
_fields = b.obj();
}
}
// TODO: Merge with futures API? We do a lot of error checking here that would be useful elsewhere.
void ParallelSortClusteredCursor::_init() {
// log() << "Starting parallel search..." << endl;
// make sure we're not already initialized
assert( ! _cursors );
_cursors = new FilteringClientCursor[_numServers];
bool returnPartial = ( _options & QueryOption_PartialResults );
vector<ServerAndQuery> queries( _servers.begin(), _servers.end() );
set<int> retryQueries;
int finishedQueries = 0;
vector< shared_ptr<ShardConnection> > conns;
vector<string> servers;
// Since we may get all sorts of errors, record them all as they come and throw them later if necessary
vector<string> staleConfigExs;
vector<string> socketExs;
vector<string> otherExs;
bool allConfigStale = false;
int retries = -1;
// Loop through all the queries until we've finished or gotten a socket exception on all of them
// We break early for non-socket exceptions, and socket exceptions if we aren't returning partial results
do {
retries++;
bool firstPass = retryQueries.size() == 0;
if( ! firstPass ){
log() << "retrying " << ( returnPartial ? "(partial) " : "" ) << "parallel connection to ";
for( set<int>::iterator it = retryQueries.begin(); it != retryQueries.end(); ++it ){
log() << queries[*it]._server << ", ";
}
log() << finishedQueries << " finished queries." << endl;
}
size_t num = 0;
for ( vector<ServerAndQuery>::iterator it = queries.begin(); it != queries.end(); ++it ) {
size_t i = num++;
const ServerAndQuery& sq = *it;
// If we're not retrying this cursor on later passes, continue
if( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) continue;
// log() << "Querying " << _query << " from " << _ns << " for " << sq._server << endl;
BSONObj q = _query;
if ( ! sq._extra.isEmpty() ) {
q = concatQuery( q , sq._extra );
}
string errLoc = " @ " + sq._server;
if( firstPass ){
// This may be the first time connecting to this shard, if so we can get an error here
try {
conns.push_back( shared_ptr<ShardConnection>( new ShardConnection( sq._server , _ns ) ) );
}
catch( std::exception& e ){
socketExs.push_back( e.what() + errLoc );
if( ! returnPartial ){
num--;
break;
}
conns.push_back( shared_ptr<ShardConnection>() );
continue;
}
servers.push_back( sq._server );
}
if ( conns[i]->setVersion() ) {
conns[i]->done();
staleConfigExs.push_back( (string)"stale config detected for " + StaleConfigException( _ns , "ParallelCursor::_init" , true ).what() + errLoc );
break;
}
LOG(5) << "ParallelSortClusteredCursor::init server:" << sq._server << " ns:" << _ns
<< " query:" << q << " _fields:" << _fields << " options: " << _options << endl;
if( ! _cursors[i].raw() )
_cursors[i].reset( new DBClientCursor( conns[i]->get() , _ns , q ,
0 , // nToReturn
0 , // nToSkip
_fields.isEmpty() ? 0 : &_fields , // fieldsToReturn
_options ,
// NtoReturn is weird.
// If zero, it means use default size, so we do that for all cursors
// If positive, it's the batch size (we don't want this cursor limiting results), tha
// done at a higher level
// If negative, it's the batch size, but we don't create a cursor - so we don't want
// to create a child cursor either.
// Either way, if non-zero, we want to pull back the batch size + the skip amount as
// quickly as possible. Potentially, for a cursor on a single shard or if we keep be
// chunks, we can actually add the skip value into the cursor and/or make some assump
// return value size ( (batch size + skip amount) / num_servers ).
_batchSize == 0 ? 0 :
( _batchSize > 0 ? _batchSize + _needToSkip :
_batchSize - _needToSkip ) // batchSize
) );
try{
_cursors[i].raw()->initLazy( ! firstPass );
}
catch( SocketException& e ){
socketExs.push_back( e.what() + errLoc );
_cursors[i].reset( NULL );
conns[i]->done();
if( ! returnPartial ) break;
}
catch( std::exception& e){
otherExs.push_back( e.what() + errLoc );
_cursors[i].reset( NULL );
conns[i]->done();
break;
}
}
// Go through all the potentially started cursors and finish initializing them or log any errors and
// potentially retry
// TODO: Better error classification would make this easier, errors are indicated in all sorts of ways
// here that we need to trap.
for ( size_t i = 0; i < num; i++ ) {
// log() << "Finishing query for " << cons[i].get()->getHost() << endl;
string errLoc = " @ " + queries[i]._server;
if( ! _cursors[i].raw() || ( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) ){
if( conns[i] ) conns[i].get()->done();
continue;
}
assert( conns[i] );
retryQueries.erase( i );
bool retry = false;
try {
if( ! _cursors[i].raw()->initLazyFinish( retry ) ) {
warning() << "invalid result from " << conns[i]->getHost() << ( retry ? ", retrying" : "" ) << endl;
_cursors[i].reset( NULL );
if( ! retry ){
socketExs.push_back( str::stream() << "error querying server: " << servers[i] );
conns[i]->done();
}
else {
retryQueries.insert( i );
}
continue;
}
}
catch ( MsgAssertionException& e ){
socketExs.push_back( e.what() + errLoc );
_cursors[i].reset( NULL );
conns[i]->done();
continue;
}
catch ( SocketException& e ) {
socketExs.push_back( e.what() + errLoc );
_cursors[i].reset( NULL );
conns[i]->done();
continue;
}
catch( std::exception& e ){
otherExs.push_back( e.what() + errLoc );
_cursors[i].reset( NULL );
conns[i]->done();
continue;
}
try {
_cursors[i].raw()->attach( conns[i].get() ); // this calls done on conn
_checkCursor( _cursors[i].raw() );
finishedQueries++;
}
catch ( StaleConfigException& e ){
// Our stored configuration data is actually stale, we need to reload it
// when we throw our exception
allConfigStale = true;
staleConfigExs.push_back( (string)"stale config detected for " + e.what() + errLoc );
_cursors[i].reset( NULL );
conns[i]->done();
continue;
}
catch( std::exception& e ){
otherExs.push_back( e.what() + errLoc );
_cursors[i].reset( NULL );
conns[i]->done();
continue;
}
}
// Don't exceed our max retries, should not happen
assert( retries < 5 );
}
while( retryQueries.size() > 0 /* something to retry */ &&
( socketExs.size() == 0 || returnPartial ) /* no conn issues */ &&
staleConfigExs.size() == 0 /* no config issues */ &&
otherExs.size() == 0 /* no other issues */);
// Assert that our conns are all closed!
for( vector< shared_ptr<ShardConnection> >::iterator i = conns.begin(); i < conns.end(); ++i ){
assert( ! (*i) || ! (*i)->ok() );
}
// Handle errors we got during initialization.
// If we're returning partial results, we can ignore socketExs, but nothing else
// Log a warning in any case, so we don't lose these messages
bool throwException = ( socketExs.size() > 0 && ! returnPartial ) || staleConfigExs.size() > 0 || otherExs.size() > 0;
if( socketExs.size() > 0 || staleConfigExs.size() > 0 || otherExs.size() > 0 ) {
vector<string> errMsgs;
errMsgs.insert( errMsgs.end(), staleConfigExs.begin(), staleConfigExs.end() );
errMsgs.insert( errMsgs.end(), otherExs.begin(), otherExs.end() );
errMsgs.insert( errMsgs.end(), socketExs.begin(), socketExs.end() );
stringstream errMsg;
errMsg << "could not initialize cursor across all shards because : ";
for( vector<string>::iterator i = errMsgs.begin(); i != errMsgs.end(); i++ ){
if( i != errMsgs.begin() ) errMsg << " :: and :: ";
errMsg << *i;
}
if( throwException && staleConfigExs.size() > 0 )
throw StaleConfigException( _ns , errMsg.str() , ! allConfigStale );
else if( throwException )
throw DBException( errMsg.str(), 14827 );
else
warning() << errMsg.str() << endl;
}
if( retries > 0 )
log() << "successfully finished parallel query after " << retries << " retries" << endl;
}
ParallelSortClusteredCursor::~ParallelSortClusteredCursor() {
delete [] _cursors;
_cursors = 0;
}
bool ParallelSortClusteredCursor::more() {
if ( _needToSkip > 0 ) {
int n = _needToSkip;
_needToSkip = 0;
while ( n > 0 && more() ) {
BSONObj x = next();
n--;
}
_needToSkip = n;
}
for ( int i=0; i<_numServers; i++ ) {
if ( _cursors[i].more() )
return true;
}
return false;
}
BSONObj ParallelSortClusteredCursor::next() {
BSONObj best = BSONObj();
int bestFrom = -1;
for( int j = 0; j < _numServers; j++ ){
// Iterate _numServers times, starting one past the last server we used.
// This means we actually start at server #1, not #0, but shouldn't matter
int i = ( j + _lastFrom + 1 ) % _numServers;
if ( ! _cursors[i].more() )
continue;
BSONObj me = _cursors[i].peek();
if ( best.isEmpty() ) {
best = me;
bestFrom = i;
if( _sortKey.isEmpty() ) break;
continue;
}
int comp = best.woSortOrder( me , _sortKey , true );
if ( comp < 0 )
continue;
best = me;
bestFrom = i;
}
_lastFrom = bestFrom;
uassert( 10019 , "no more elements" , ! best.isEmpty() );
_cursors[bestFrom].next();
return best;
}
void ParallelSortClusteredCursor::_explain( map< string,list<BSONObj> >& out ) {
for ( set<ServerAndQuery>::iterator i=_servers.begin(); i!=_servers.end(); ++i ) {
const ServerAndQuery& sq = *i;
list<BSONObj> & l = out[sq._server];
l.push_back( explain( sq._server , sq._extra ) );
}
}
// -----------------
// ---- Future -----
// -----------------
Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn )
:_server(server) ,_db(db) , _options(options), _cmd(cmd) ,_conn(conn) ,_done(false)
{
try {
if ( ! _conn ){
_connHolder.reset( new ScopedDbConnection( _server ) );
_conn = _connHolder->get();
}
if ( _conn->lazySupported() ) {
_cursor.reset( new DBClientCursor(_conn, _db + ".$cmd", _cmd, -1/*limit*/, 0, NULL, _options, 0));
_cursor->initLazy();
}
else {
_done = true; // we set _done first because even if there is an error we're done
_ok = _conn->runCommand( db , cmd , _res , options );
}
}
catch ( std::exception& e ) {
error() << "Future::spawnComand (part 1) exception: " << e.what() << endl;
_ok = false;
_done = true;
}
}
bool Future::CommandResult::join() {
if (_done)
return _ok;
try {
// TODO: Allow retries?
bool retry = false;
bool finished = _cursor->initLazyFinish( retry );
// Shouldn't need to communicate with server any more
if ( _connHolder )
_connHolder->done();
uassert(14812, str::stream() << "Error running command on server: " << _server, finished);
massert(14813, "Command returned nothing", _cursor->more());
_res = _cursor->nextSafe();
_ok = _res["ok"].trueValue();
}
catch ( std::exception& e ) {
error() << "Future::spawnComand (part 2) exception: " << e.what() << endl;
_ok = false;
}
_done = true;
return _ok;
}
shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn ) {
shared_ptr<Future::CommandResult> res (new Future::CommandResult( server , db , cmd , options , conn ));
return res;
}
}