SERVER-11611 port rename_collection over to new collection interface

Signed-off-by: Matt Kangas <matt.kangas@mongodb.com>
This commit is contained in:
Amalia Hawkins 2013-12-02 10:31:54 -05:00 committed by Matt Kangas
parent 8416afb7c5
commit 0abf27ae91
2 changed files with 259 additions and 102 deletions

56
jstests/rename7.js Normal file
View File

@ -0,0 +1,56 @@
// ***************************************************************
// rename7.js
// Test renameCollection functionality across different databases.
// ***************************************************************
// Set up namespaces a and b.
admin = db.getMongo().getDB( "admin" );
db_a = db.getMongo().getDB( "db_a" );
db_b = db.getMongo().getDB( "db_b" );
a = db_a.rename7;
b = db_b.rename7;
a.drop();
b.drop();
// Put some documents and indexes in a.
a.save( {a: 1} );
a.save( {a: 2} );
a.save( {a: 3} );
a.ensureIndex( {a: 1} );
a.ensureIndex( {b: 1} );
assert.commandWorked( admin.runCommand( {renameCollection: "db_a.rename7", to: "db_b.rename7"} ) );
assert.eq( 0, a.find().count() );
assert( !db_a.system.namespaces.findOne( {name: "db_a.rename7"} ) );
assert.eq( 3, b.find().count() );
assert( db_b.system.namespaces.findOne( {name: "db_b.rename7"} ) );
assert( b.find( {a: 1} ).explain().cursor.match( /^BtreeCursor/ ) );
a.drop();
b.drop();
// Capped collection testing.
db_a.createCollection( "rename7_capped", {capped:true, size:10000} );
a = db_a.rename7_capped;
b = db_b.rename7_capped;
a.save( {a: 1} );
a.save( {a: 2} );
a.save( {a: 3} );
assert.commandWorked( admin.runCommand( {renameCollection: "db_a.rename7_capped",
to: "db_b.rename7_capped"} ) );
assert.eq( 0, a.find().count() );
assert( !db_a.system.namespaces.findOne( {name: "db_a.rename7_capped"} ) );
assert.eq( 3, b.find().count() );
assert( db_b.system.namespaces.findOne( {name: "db_b.rename7_capped"} ) );
assert.eq( true, db_b.system.namespaces.findOne( {name:"db_b.rename7_capped"} ).options.capped );
assert.eq( 12288, b.stats().storageSize );
a.drop();
b.drop();

View File

@ -29,12 +29,16 @@
*/
#include "mongo/client/dbclientcursor.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/rename_collection.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/index_builder.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/instance.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/structure/collection.h"
#include "mongo/db/structure/collection_iterator.h"
namespace mongo {
@ -81,18 +85,25 @@ namespace mongo {
BSONObjBuilder builder;
builder.appendElements(stripped);
builder.append("ns", target);
indexes.push_back(builder.done());
indexes.push_back(builder.obj());
}
return indexes;
}
virtual void restoreIndexBuildsOnSource(std::vector<BSONObj> indexesInProg, std::string source) {
string sourceIndexes = nsToDatabase( source ) + ".system.indexes";
IndexBuilder::restoreIndexes( sourceIndexes, indexesInProg );
}
virtual bool run(const string& dbname, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
string source = cmdObj.getStringField( name.c_str() );
string target = cmdObj.getStringField( "to" );
uassert(15967,
"invalid collection name: " + target,
NamespaceString::validCollectionComponent(target.c_str()));
if ( !NamespaceString::validCollectionComponent(target.c_str()) ) {
errmsg = "invalid collection name: " + target;
return false;
}
if ( source.empty() || target.empty() ) {
errmsg = "invalid command syntax";
return false;
@ -100,129 +111,219 @@ namespace mongo {
string sourceDB = nsToDatabase(source);
string targetDB = nsToDatabase(target);
string databaseName = sourceDB;
databaseName += ".system.indexes";
int longestIndexNameLength = 0;
vector<BSONObj> oldIndSpec = Helpers::findAll(databaseName, BSON("ns" << source));
for (size_t i = 0; i < oldIndSpec.size(); ++i) {
int thisLength = oldIndSpec[i].getField("name").valuesize();
if (thisLength > longestIndexNameLength) {
longestIndexNameLength = thisLength;
}
}
unsigned int longestAllowed = maxNamespaceLen - longestIndexNameLength - 1;
if (target.size() > longestAllowed) {
StringBuilder sb;
sb << "collection name length of " << target.size()
<< " exceeds maximum length of " << longestAllowed
<< ", allowing for index names";
uasserted(16451, sb.str());
}
bool capped = false;
long long size = 0;
std::vector<BSONObj> indexesInProg;
{
Client::Context ctx( source );
NamespaceDetails *nsd = nsdetails( source );
uassert( 10026 , "source namespace does not exist", nsd );
indexesInProg = stopIndexBuilds(dbname, cmdObj);
capped = nsd->isCapped();
if ( capped )
for( DiskLoc i = nsd->firstExtent(); !i.isNull(); i = i.ext()->xnext )
size += i.ext()->length;
}
Client::Context srcCtx( source );
Collection* sourceColl = srcCtx.db()->getCollection( source );
Client::Context ctx( target );
if ( nsdetails( target ) ) {
uassert( 10027 , "target namespace exists", cmdObj["dropTarget"].trueValue() );
Status s = cc().database()->dropCollection( target );
if ( !s.isOK() ) {
errmsg = s.toString();
if ( !sourceColl ) {
errmsg = "source namespace does not exist";
return false;
}
// Ensure that collection name does not exceed maximum length.
// Ensure that index names do not push the length over the max.
// Iterator includes unfinished indexes.
IndexCatalog::IndexIterator sourceIndIt =
sourceColl->getIndexCatalog()->getIndexIterator( true );
int longestIndexNameLength = 0;
while ( sourceIndIt.more() ) {
int thisLength = sourceIndIt.next()->getInfoElement("name").valuesize();
if ( thisLength > longestIndexNameLength )
longestIndexNameLength = thisLength;
}
unsigned int longestAllowed = maxNamespaceLen - longestIndexNameLength - 1;
if (target.size() > longestAllowed) {
StringBuilder sb;
sb << "collection name length of " << target.size()
<< " exceeds maximum length of " << longestAllowed
<< ", allowing for index names";
errmsg = sb.str();
return false;
}
{
NamespaceDetails *nsd = nsdetails( source );
indexesInProg = stopIndexBuilds( dbname, cmdObj );
capped = nsd->isCapped();
if ( capped )
for( DiskLoc i = nsd->firstExtent(); !i.isNull(); i = i.ext()->xnext )
size += i.ext()->length;
}
}
// if we are renaming in the same database, just
// rename the namespace and we're done.
{
if ( sourceDB == targetDB ) {
Status s = ctx.db()->renameCollection( source, target, cmdObj["stayTemp"].trueValue() );
Client::Context ctx( target );
// Check if the target namespace exists and if dropTarget is true.
// If target exists and dropTarget is not true, return false.
if ( ctx.db()->getCollection( target ) ) {
if ( !cmdObj["dropTarget"].trueValue() ) {
errmsg = "target namespace exists";
return false;
}
Status s = cc().database()->dropCollection( target );
if ( !s.isOK() ) {
errmsg = s.toString();
restoreIndexBuildsOnSource( indexesInProg, source );
return false;
}
}
// If we are renaming in the same database, just
// rename the namespace and we're done.
if ( sourceDB == targetDB ) {
Status s = ctx.db()->renameCollection( source, target,
cmdObj["stayTemp"].trueValue() );
if ( !s.isOK() ) {
errmsg = s.toString();
restoreIndexBuildsOnSource( indexesInProg, source );
return false;
}
return true;
}
}
// renaming across databases, so we must copy all
// the data and then remove the source collection.
BSONObjBuilder spec;
if ( capped ) {
spec.appendBool( "capped", true );
spec.append( "size", double( size ) );
}
if ( !userCreateNS( target.c_str(), spec.done(), errmsg, false ) )
return false;
// Otherwise, we are enaming across databases, so we must copy all
// the data and then remove the source collection.
auto_ptr< DBClientCursor > c;
DBDirectClient bridge;
{
c = bridge.query( source, BSONObj(), 0, 0, 0, fromRepl ? QueryOption_SlaveOk : 0 );
}
while( 1 ) {
{
if ( !c->more() )
break;
// Create the target collection.
Collection* targetColl = NULL;
if ( capped ) {
BSONObjBuilder spec;
spec.appendBool( "capped", true );
spec.append( "size", double( size ) );
spec.appendBool( "autoIndexId", false );
userCreateNS( target.c_str(), spec.obj(), errmsg, false );
targetColl = ctx.db()->getCollection( target );
}
BSONObj o = c->next();
theDataFileMgr.insertWithObjMod( target.c_str(), o );
}
string sourceIndexes = nsToDatabase( source ) + ".system.indexes";
string targetIndexes = nsToDatabase( target ) + ".system.indexes";
{
c = bridge.query( sourceIndexes, QUERY( "ns" << source ), 0, 0, 0, fromRepl ? QueryOption_SlaveOk : 0 );
}
while( 1 ) {
{
if ( !c->more() )
break;
else {
BSONObjBuilder spec;
spec.appendBool( "autoIndexId", false );
const BSONObj options = spec.obj();
// No logOp necessary because the entire renameCollection command is one logOp.
targetColl = ctx.db()->createCollection( target, false, &options, true );
}
BSONObj o = c->next();
BSONObjBuilder b;
BSONObjIterator i( o );
while( i.moreWithEOO() ) {
BSONElement e = i.next();
if ( e.eoo() )
break;
if ( strcmp( e.fieldName(), "ns" ) == 0 ) {
b.append( "ns", target );
}
else {
b.append( e );
}
}
BSONObj n = b.done();
theDataFileMgr.insertWithObjMod( targetIndexes.c_str(), n );
}
{
Client::Context ctx( source );
Status s = ctx.db()->dropCollection( source );
if ( !s.isOK() ) {
errmsg = s.toString();
if ( !targetColl ) {
errmsg = "Failed to create target collection.";
restoreIndexBuildsOnSource( indexesInProg, source );
return false;
}
IndexBuilder::restoreIndexes(targetIndexes, indexesInProg);
}
// Copy over all the data from source collection to target collection.
bool insertSuccessful = true;
boost::scoped_ptr<CollectionIterator> sourceIt;
{
Client::Context srcCtx( source );
Collection* sourceColl = srcCtx.db()->getCollection( source );
sourceIt.reset( sourceColl->getIterator( DiskLoc(), false, CollectionScanParams::FORWARD ) );
}
Collection* targetColl = NULL;
while ( !sourceIt->isEOF() ) {
BSONObj o;
{
Client::Context srcCtx( source );
o = sourceIt->getNext().obj();
}
// Insert and check return status of insert.
{
Client::Context ctx( target );
if ( !targetColl )
targetColl = ctx.db()->getCollection( target );
// No logOp necessary because the entire renameCollection command is one logOp.
Status s = targetColl->insertDocument( o, true ).getStatus();
if ( !s.isOK() ) {
insertSuccessful = false;
errmsg = s.toString();
break;
}
}
}
// If inserts were unsuccessful, drop the target collection and return false.
if ( !insertSuccessful ) {
Client::Context ctx( target );
Status s = ctx.db()->dropCollection( target );
if ( !s.isOK() )
errmsg = s.toString();
restoreIndexBuildsOnSource( indexesInProg, source );
return false;
}
// Copy over the indexes to temp storage and then to the target..
vector<BSONObj> copiedIndexes;
bool indexSuccessful = true;
{
Client::Context srcCtx( source );
Collection* sourceColl = srcCtx.db()->getCollection( source );
IndexCatalog::IndexIterator sourceIndIt =
sourceColl->getIndexCatalog()->getIndexIterator( true );
while ( sourceIndIt.more() ) {
BSONObj currIndex = sourceIndIt.next()->infoObj();
// Process the source index.
BSONObjBuilder b;
BSONObjIterator i( currIndex );
while( i.moreWithEOO() ) {
BSONElement e = i.next();
if ( e.eoo() )
break;
else if ( strcmp( e.fieldName(), "ns" ) == 0 )
b.append( "ns", target );
else
b.append( e );
}
BSONObj newIndex = b.obj();
copiedIndexes.push_back( newIndex );
}
}
{
Client::Context ctx( target );
if ( !targetColl )
targetColl = ctx.db()->getCollection( target );
for ( vector<BSONObj>::iterator it = copiedIndexes.begin();
it != copiedIndexes.end(); ++it ) {
Status s = targetColl->getIndexCatalog()->createIndex( *it, true );
if ( !s.isOK() ) {
indexSuccessful = false;
errmsg = s.toString();
break;
}
}
// If indexes were unsuccessful, drop the target collection and return false.
if ( !indexSuccessful ) {
Status s = ctx.db()->dropCollection( target );
if ( !s.isOK() )
errmsg = s.toString();
restoreIndexBuildsOnSource( indexesInProg, source );
return false;
}
}
// Drop the source collection.
{
Client::Context srcCtx( source );
Status s = srcCtx.db()->dropCollection( source );
if ( !s.isOK() ) {
errmsg = s.toString();
restoreIndexBuildsOnSource( indexesInProg, source );
return false;
}
}
return true;
}
} cmdrenamecollection;