RDF storage with a Key/Value Engine
Just curious, can I store some RDF statements in a Key/Value engine like BerkeleyDB (java Edition) ?
Yes, it's like re-inventing the wheel but, again, I like re-inventing the wheel :-)
A berkeleyDB Database contains a set of Key/Value. e.g.
Key | Value |
---|---|
SecurityNumber:9877 | FirstName:John LastName:Doe |
SecurityNumber:9899 | FirstName:Peter LastName:Parker |
SecurityNumber:9988 | FirstName:Edith LastName:Parker |
Data are stored as an array of bytes. Keys are ordered on a bit-based order and the records are stored in a B-Tree table. Databases can be defined to store unique or duplicated keys.
In BerkeleyDB, a Cursor is an iterator used to scan the database: as the keys are sorted , accessing a range of keys is very fast.
Some indexes (Secondary Database) can be linked to a Database, for example, in the previous table, if you want to quickly access the person having a LastName=="Parker" I would create a secondary database on LastName. Deleting an item in the secondary database, automatically delete the corresponding item in the main database. As far as I know, you cannot create a secondary database if your primary database allows duplicate keys.
Key2 | Key1 | Value1 |
---|---|---|
LastName:Doe | SecurityNumber:9877 | FirstName:John LastName:Doe |
LastName:Parker | SecurityNumber:9899 | FirstName:Peter LastName:Parker |
LastName:Parker | SecurityNumber:9988 | FirstName:Edith LastName:Parker |
OK, now I want to store some RDF statements, that is to say something like the following triple:
{
SUBJECT = RESOURCE;
PREDICATE = RESOURCE;
OBJECT = ( RESOURCE || LITERAL)
}
I need to create an index to quickly find any statement matching one , two or the three components of the statement.
In the solution I've implemented, there is only one primary database and all the component of a statement are part of the 'DATA' , the 'KEY' of the database is just a unique number.
Key | Value |
---|---|
1 | (s1,p1,o1) |
2 | (s2,p2,o2) |
3 | (s3,p3,o3) |
Creating the Database 'triplesDB' :
EnvironmentConfig envCfg= new EnvironmentConfig();
envCfg.setAllowCreate(true);
this.environment= new Environment(envFile,envCfg);
DatabaseConfig cfg= new DatabaseConfig();
cfg.setAllowCreate(true);
cfg.setSortedDuplicates(false);
this.triplesDB= this.environment.openDatabase( null, "triples", cfg);
envCfg.setAllowCreate(true);
this.environment= new Environment(envFile,envCfg);
DatabaseConfig cfg= new DatabaseConfig();
cfg.setAllowCreate(true);
cfg.setSortedDuplicates(false);
this.triplesDB= this.environment.openDatabase( null, "triples", cfg);
We then create an the secondary indexes for each component of a statement ( subject2triple ,predicate2triple, objectLiteral2triple, objectRsrc2triple) . For example, the following code opens the secondaty database 'objectRsrc2triple'. We create a 'SecondaryKeyCreator' to tell BerkeleyDB about how it should extract the secondary key from the primary data.
config2= new SecondaryConfig();
config2.setAllowCreate(true);
config2.setSortedDuplicates(true);
config2.setKeyCreator(new SecondaryKeyCreator()
{
@Override
public boolean createSecondaryKey(SecondaryDatabase arg0,
DatabaseEntry key,
DatabaseEntry data,
DatabaseEntry result) throws DatabaseException {
Statement stmt= STMT_VALUE_BINDING.entryToObject(data);
if(!stmt.getValue().isResource()) return false;
Resource L= Resource.class.cast(stmt.getValue());
TupleOutput out= new TupleOutput();
saveResource(L, out);
result.setData(out.toByteArray());
return true;
}
});
this.objectRsrc2triple=this.environment.openSecondaryDatabase(null,"objectRsrc2triple", triplesDB, config2);
We need some methods to write and read the components of a Statement from/to an array of bytes:
public void objectToEntry(Statement stmt, TupleOutput out)
{
saveResource(stmt.getSubject(),out);
saveResource(stmt.getPredicate(),out);
if(stmt.getValue().isResource())
{
out.writeByte(OPCODE_RESOURCE);
saveResource(Resource.class.cast(stmt.getValue()),out);
}
else
{
out.writeByte(OPCODE_LITERAL);
saveLiteral(Literal.class.cast(stmt.getValue()),out);
}
}
public Statement entryToObject(TupleInput in)
{
Resource subject = readResource(in);
Resource predicate = readResource(in);
RDFNode object=null;
switch(in.readByte())
{
case OPCODE_RESOURCE:
{
object= readResource(in);
break;
}
case OPCODE_LITERAL:
{
object= readLiteral(in);
break;
}
default: throw new IllegalStateException("Unknown opcode");
}
return new Statement(subject,predicate,object);
}
}
{
saveResource(stmt.getSubject(),out);
saveResource(stmt.getPredicate(),out);
if(stmt.getValue().isResource())
{
out.writeByte(OPCODE_RESOURCE);
saveResource(Resource.class.cast(stmt.getValue()),out);
}
else
{
out.writeByte(OPCODE_LITERAL);
saveLiteral(Literal.class.cast(stmt.getValue()),out);
}
}
public Statement entryToObject(TupleInput in)
{
Resource subject = readResource(in);
Resource predicate = readResource(in);
RDFNode object=null;
switch(in.readByte())
{
case OPCODE_RESOURCE:
{
object= readResource(in);
break;
}
case OPCODE_LITERAL:
{
object= readLiteral(in);
break;
}
default: throw new IllegalStateException("Unknown opcode");
}
return new Statement(subject,predicate,object);
}
}
I've also wrapped the Cursor in a java.util.Iterator:. One interesting Iterator is the JoinIterator which quickly retrieve the *common* Statements returned from a distinct set of 'Cursors'. When we first retrieve the values of those cursors, we seek for each searched keys. Then we let the BerkeleyDB API finding the intersection of those cursors.
private class JoinIterator
extends AbstractIterator<Statement>
{
/** the joined iterators */
protected List<CursorAndEntries> cursorEntries;
/** our join cursor */
private JoinCursor joinCursor=null;
/** current key */
protected DatabaseEntry keyEntry=new DatabaseEntry();
/** current value */
protected DatabaseEntry valueEntry=new DatabaseEntry();
protected JoinIterator(List<CursorAndEntries> cursorsEntries)
{
this.cursorEntries=new ArrayList<CursorAndEntries>(cursorsEntries);
}
protected Statement readNext() throws DatabaseException
{
if(super._firstCall)
{
super._firstCall=false;
for(CursorAndEntries ca:this.cursorEntries)
{
if(ca.cursor.getSearchKey(ca.keyEntry,ca.valueEntry,null)!=OperationStatus.SUCCESS)
{
return null;
}
}
Cursor cursors[]= new Cursor[this.cursorEntries.size()];
for(int i=0;i< this.cursorEntries.size();++i)
{
cursors[i]= cursorEntries.get(i).cursor;
}
joinCursor = BerkeleyDBModel.this.triplesDB.join(cursors, null);
}
if(joinCursor.getNext(keyEntry, valueEntry,LockMode.DEFAULT)==OperationStatus.SUCCESS)
{
return STMT_VALUE_BINDING.entryToObject(valueEntry);
}
else
{
return null;
}
}
@Override
public void close()
{
for(CursorAndEntries cursor:cursorEntries)
{
try {
if(cursor.cursor!=null) cursor.cursor.close();
cursor.cursor=null;
} catch (Exception e) {
e.printStackTrace();
}
}
try {
if(joinCursor!=null) joinCursor.close();
joinCursor=null;
} catch (Exception e) {
e.printStackTrace();
}
}
}
extends AbstractIterator<Statement>
{
/** the joined iterators */
protected List<CursorAndEntries> cursorEntries;
/** our join cursor */
private JoinCursor joinCursor=null;
/** current key */
protected DatabaseEntry keyEntry=new DatabaseEntry();
/** current value */
protected DatabaseEntry valueEntry=new DatabaseEntry();
protected JoinIterator(List<CursorAndEntries> cursorsEntries)
{
this.cursorEntries=new ArrayList<CursorAndEntries>(cursorsEntries);
}
protected Statement readNext() throws DatabaseException
{
if(super._firstCall)
{
super._firstCall=false;
for(CursorAndEntries ca:this.cursorEntries)
{
if(ca.cursor.getSearchKey(ca.keyEntry,ca.valueEntry,null)!=OperationStatus.SUCCESS)
{
return null;
}
}
Cursor cursors[]= new Cursor[this.cursorEntries.size()];
for(int i=0;i< this.cursorEntries.size();++i)
{
cursors[i]= cursorEntries.get(i).cursor;
}
joinCursor = BerkeleyDBModel.this.triplesDB.join(cursors, null);
}
if(joinCursor.getNext(keyEntry, valueEntry,LockMode.DEFAULT)==OperationStatus.SUCCESS)
{
return STMT_VALUE_BINDING.entryToObject(valueEntry);
}
else
{
return null;
}
}
@Override
public void close()
{
for(CursorAndEntries cursor:cursorEntries)
{
try {
if(cursor.cursor!=null) cursor.cursor.close();
cursor.cursor=null;
} catch (Exception e) {
e.printStackTrace();
}
}
try {
if(joinCursor!=null) joinCursor.close();
joinCursor=null;
} catch (Exception e) {
e.printStackTrace();
}
}
}
This JoinCursor is then used for any specific query. For example, the following method 'listStatement' takes three parameters (s,p,o) and return an Iterator over a list of Statements. If a parameter is null, it will be used as a wildcard: this is where we shall use the secondary databases for finding the statements.
public CloseableIterator<Statement> listStatements(
Resource s,
Resource p,
RDFNode o) throws RDFException
{
if(s==null && p==null && o==null)
{
return listStatements();
}
try
{
List<CursorAndEntries> cursors= new ArrayList<CursorAndEntries>(3);
if(s!=null)/** subject is not null */
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.subject2triple.openCursor(null, null);
RSRC_KEY_BINDING.objectToEntry(s, ca.keyEntry);
cursors.add(ca);
}
if(p!=null)/** predicate is not null */
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.predicate2triple.openCursor(null, null);
RSRC_KEY_BINDING.objectToEntry(p, ca.keyEntry);
cursors.add(ca);
}
if(o!=null)/** object is not null */
{
if(o.isResource())
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.objectRsrc2triple.openCursor(null, null);
RSRC_KEY_BINDING.objectToEntry(Resource.class.cast(o), ca.keyEntry);
cursors.add(ca);
}
else
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.objectLiteral2triple.openCursor(null, null);
LITERAL_KEY_BINDING.objectToEntry(Literal.class.cast(o), ca.keyEntry);
cursors.add(ca);
}
}
return new JoinIterator(cursors);
}
catch (DatabaseException e)
{
throw new RDFException(e);
}
}
Resource s,
Resource p,
RDFNode o) throws RDFException
{
if(s==null && p==null && o==null)
{
return listStatements();
}
try
{
List<CursorAndEntries> cursors= new ArrayList<CursorAndEntries>(3);
if(s!=null)/** subject is not null */
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.subject2triple.openCursor(null, null);
RSRC_KEY_BINDING.objectToEntry(s, ca.keyEntry);
cursors.add(ca);
}
if(p!=null)/** predicate is not null */
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.predicate2triple.openCursor(null, null);
RSRC_KEY_BINDING.objectToEntry(p, ca.keyEntry);
cursors.add(ca);
}
if(o!=null)/** object is not null */
{
if(o.isResource())
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.objectRsrc2triple.openCursor(null, null);
RSRC_KEY_BINDING.objectToEntry(Resource.class.cast(o), ca.keyEntry);
cursors.add(ca);
}
else
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.objectLiteral2triple.openCursor(null, null);
LITERAL_KEY_BINDING.objectToEntry(Literal.class.cast(o), ca.keyEntry);
cursors.add(ca);
}
}
return new JoinIterator(cursors);
}
catch (DatabaseException e)
{
throw new RDFException(e);
}
}
Performance
This RDFStore was used to download and parse a remote gzipped file from genontology.org. The uncompressed file is ~61.0Mo and contains 774579 statements. It took about ~6 min) to download and digest the file. Remember that each time a statement is about to be inserted, we need to check if it doesn't already exists in the database The amount of space required to store the database was 591Mo (ouch !!).
This code was my first idea about how to solve this problem. Obviously, some other engines are far more efficient :-) :(...)Good progress though, last night's run yielded 5 billion triples loaded in just under 10 hours for an average throughput of 135k triples per second. Max throughput was just above 210k triples per second. 1 billion triples was reached in an astonishing 78 minutes.(...)
As a conclusion, here is the full source code for the first version of this storage engine:
package org.lindenb.sw.model;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
import javax.xml.stream.XMLStreamException;
import org.lindenb.sw.RDFException;
import org.lindenb.sw.io.RDFHandler;
import org.lindenb.sw.nodes.Literal;
import org.lindenb.sw.nodes.RDFNode;
import org.lindenb.sw.nodes.Resource;
import org.lindenb.sw.nodes.Statement;
import org.lindenb.util.iterator.CloseableIterator;
import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.JoinCursor;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.SecondaryConfig;
import com.sleepycat.je.SecondaryDatabase;
import com.sleepycat.je.SecondaryKeyCreator;
public class BerkeleyDBModel
{
private static final byte OPCODE_RESOURCE = 'R';
private static final byte OPCODE_LITERAL = 'L';
private Environment environment;
private Database triplesDB;
private SecondaryDatabase subject2triple= null;
private SecondaryDatabase predicate2triple= null;
private SecondaryDatabase objectLiteral2triple= null;
private SecondaryDatabase objectRsrc2triple= null;
private static final StatementBinding STMT_VALUE_BINDING= new StatementBinding();
private static final ResourceBinding RSRC_KEY_BINDING= new ResourceBinding();
private static final LiteralBinding LITERAL_KEY_BINDING= new LiteralBinding();
/**
* TupleBinding for a Resource
*
*/
private static class ResourceBinding
extends TupleBinding<Resource>
{
public void objectToEntry(Resource rsrc, TupleOutput out)
{
saveResource(rsrc,out);
}
public Resource entryToObject(TupleInput in)
{
return readResource(in);
}
}
/**
* TupleBinding for a Literal
*
*/
private static class LiteralBinding
extends TupleBinding<Literal>
{
public void objectToEntry(Literal rsrc, TupleOutput out)
{
saveLiteral(rsrc,out);
}
public Literal entryToObject(TupleInput in)
{
return readLiteral(in);
}
}
/**
* TupleBinding for a Statement
*
*/
private static class StatementBinding
extends TupleBinding<Statement>
{
public void objectToEntry(Statement stmt, TupleOutput out)
{
saveResource(stmt.getSubject(),out);
saveResource(stmt.getPredicate(),out);
if(stmt.getValue().isResource())
{
out.writeByte(OPCODE_RESOURCE);
saveResource(Resource.class.cast(stmt.getValue()),out);
}
else
{
out.writeByte(OPCODE_LITERAL);
saveLiteral(Literal.class.cast(stmt.getValue()),out);
}
}
public Statement entryToObject(TupleInput in)
{
Resource subject = readResource(in);
Resource predicate = readResource(in);
RDFNode object=null;
switch(in.readByte())
{
case OPCODE_RESOURCE:
{
object= readResource(in);
break;
}
case OPCODE_LITERAL:
{
object= readLiteral(in);
break;
}
default: throw new IllegalStateException("Unknown opcode");
}
return new Statement(subject,predicate,object);
}
}
/**
* AbstractIterator
*/
private abstract class AbstractIterator<T>
implements CloseableIterator<T>
{
protected T _object=null;
private boolean _nextTested=false;
private boolean _hasNext=false;
protected boolean _firstCall=true;
protected AbstractIterator()
{
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public boolean hasNext()
{
if(_nextTested) return _hasNext;
_nextTested=true;
_hasNext=false;
T obj=null;
try
{
obj=readNext();
_firstCall=false;
if(obj!=null)
{
_object=obj;
_hasNext=true;
}
}
catch(DatabaseException err)
{
err.printStackTrace();
}
if(!_hasNext)
{
close();
}
return _hasNext;
}
protected abstract T readNext() throws DatabaseException;
@Override
public T next()
{
if(!_nextTested)
{
if(!hasNext()) throw new IllegalStateException();
}
_nextTested=false;
_hasNext=false;
T x= _object;
_object=null;
return x;
}
@Override
public abstract void close();
}
/**
* CursorIterator
*/
private abstract class CursorIterator<T>
extends AbstractIterator<T>
{
protected Cursor cursor;
private DatabaseEntry keyEntry;
private DatabaseEntry valueEntry;
protected CursorIterator(Cursor cursor)
{
this.cursor=cursor;
this.keyEntry= new DatabaseEntry();
this.valueEntry= new DatabaseEntry();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
protected abstract T readNext(DatabaseEntry key,DatabaseEntry value) throws DatabaseException;
protected final T readNext() throws DatabaseException
{
if(this.cursor==null) return null;
return readNext(this.keyEntry,this.valueEntry);
}
@Override
public void close()
{
if(this.cursor!=null)
{
try { this.cursor.close(); }
catch (Exception e) { e.printStackTrace();}
}
this.cursor=null;
}
}
/**
* A container for the 3 values used
* in the following next JoinIterator
**/
private static class CursorAndEntries
{
Cursor cursor;
DatabaseEntry keyEntry=new DatabaseEntry();
DatabaseEntry valueEntry=new DatabaseEntry();
}
/**
* JoinIterator
*/
private class JoinIterator
extends AbstractIterator<Statement>
{
/** the joined iterators */
protected List<CursorAndEntries> cursorEntries;
/** our join cursor */
private JoinCursor joinCursor=null;
/** current key */
protected DatabaseEntry keyEntry=new DatabaseEntry();
/** current value */
protected DatabaseEntry valueEntry=new DatabaseEntry();
protected JoinIterator(List<CursorAndEntries> cursorsEntries)
{
this.cursorEntries=new ArrayList<CursorAndEntries>(cursorsEntries);
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
protected Statement readNext() throws DatabaseException
{
if(super._firstCall)
{
super._firstCall=false;
for(CursorAndEntries ca:this.cursorEntries)
{
if(ca.cursor.getSearchKey(ca.keyEntry,ca.valueEntry,null)!=OperationStatus.SUCCESS)
{
return null;
}
}
Cursor cursors[]= new Cursor[this.cursorEntries.size()];
for(int i=0;i< this.cursorEntries.size();++i)
{
cursors[i]= cursorEntries.get(i).cursor;
}
joinCursor = BerkeleyDBModel.this.triplesDB.join(cursors, null);
}
if(joinCursor.getNext(keyEntry, valueEntry, LockMode.DEFAULT)==OperationStatus.SUCCESS)
{
return STMT_VALUE_BINDING.entryToObject(valueEntry);
}
else
{
return null;
}
}
@Override
public void close()
{
for(CursorAndEntries cursor:cursorEntries)
{
try {
if(cursor.cursor!=null) cursor.cursor.close();
cursor.cursor=null;
} catch (Exception e) {
e.printStackTrace();
}
}
try {
if(joinCursor!=null) joinCursor.close();
joinCursor=null;
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
*
* BerkeleyDBModel
*
*/
public BerkeleyDBModel(
File envFile
) throws RDFException
{
try {
EnvironmentConfig envCfg= new EnvironmentConfig();
envCfg.setAllowCreate(true);
this.environment= new Environment(envFile,envCfg);
DatabaseConfig cfg= new DatabaseConfig();
cfg.setAllowCreate(true);
cfg.setSortedDuplicates(false);
this.triplesDB= this.environment.openDatabase(
null, "triples", cfg);
/* create secondary key on literal as value */
SecondaryConfig config2= new SecondaryConfig();
config2.setAllowCreate(true);
config2.setSortedDuplicates(true);
config2.setKeyCreator(new SecondaryKeyCreator()
{
@Override
public boolean createSecondaryKey(SecondaryDatabase arg0,
DatabaseEntry key,
DatabaseEntry data,
DatabaseEntry result) throws DatabaseException
{
Statement stmt= STMT_VALUE_BINDING.entryToObject(data);
if(stmt.getValue().isResource()) return false;
Literal L= Literal.class.cast(stmt.getValue());
TupleOutput out= new TupleOutput();
saveLiteral(L, out);
result.setData(out.toByteArray());
return true;
}
});
this.objectLiteral2triple=this.environment.openSecondaryDatabase(null,"objectLiteral2triple", triplesDB, config2);
/* create secondary key on resource as value */
config2= new SecondaryConfig();
config2.setAllowCreate(true);
config2.setSortedDuplicates(true);
config2.setKeyCreator(new SecondaryKeyCreator()
{
@Override
public boolean createSecondaryKey(SecondaryDatabase arg0,
DatabaseEntry key,
DatabaseEntry data,
DatabaseEntry result) throws DatabaseException {
Statement stmt= STMT_VALUE_BINDING.entryToObject(data);
if(!stmt.getValue().isResource()) return false;
Resource L= Resource.class.cast(stmt.getValue());
TupleOutput out= new TupleOutput();
saveResource(L, out);
result.setData(out.toByteArray());
return true;
}
});
this.objectRsrc2triple=this.environment.openSecondaryDatabase(null,"objectRsrc2triple", triplesDB, config2);
/* create secondary key on predicate */
config2= new SecondaryConfig();
config2.setAllowCreate(true);
config2.setSortedDuplicates(true);
config2.setKeyCreator(new SecondaryKeyCreator()
{
@Override
public boolean createSecondaryKey(SecondaryDatabase arg0,
DatabaseEntry key,
DatabaseEntry data,
DatabaseEntry result) throws DatabaseException {
Statement stmt= STMT_VALUE_BINDING.entryToObject(data);
TupleOutput out= new TupleOutput();
saveResource(stmt.getSubject(), out);
result.setData(out.toByteArray());
return true;
}
});
this.subject2triple=this.environment.openSecondaryDatabase(null, "subject2triple", triplesDB, config2);
/* create secondary key on predicate */
config2= new SecondaryConfig();
config2.setAllowCreate(true);
config2.setSortedDuplicates(true);
config2.setKeyCreator(new SecondaryKeyCreator()
{
@Override
public boolean createSecondaryKey(SecondaryDatabase arg0,
DatabaseEntry key,
DatabaseEntry data,
DatabaseEntry result) throws DatabaseException {
Statement stmt= STMT_VALUE_BINDING.entryToObject(data);
TupleOutput out= new TupleOutput();
saveResource(stmt.getPredicate(), out);
result.setData(
out.getBufferBytes(),
out.getBufferOffset(),
out.getBufferLength()
);
return true;
}
});
this.predicate2triple=this.environment.openSecondaryDatabase(null, "predicate2triple", triplesDB, config2);
}
catch (DatabaseException e)
{
throw new RDFException(e);
}
}
/** Close this model */
public void close() throws RDFException
{
try
{
subject2triple.close();
predicate2triple.close();
objectLiteral2triple.close();
objectRsrc2triple.close();
triplesDB.close();
environment.close();
} catch(DatabaseException err)
{
throw new RDFException(err);
}
subject2triple=null;
predicate2triple=null;
objectLiteral2triple=null;
objectRsrc2triple=null;
triplesDB=null;
environment=null;
}
public void clear() throws RDFException
{
try {
DatabaseEntry key= new DatabaseEntry();
DatabaseEntry data= new DatabaseEntry();
Cursor c= triplesDB.openCursor(null, null);
while(c.getNext(key, data, null)==OperationStatus.SUCCESS)
{
c.delete();
}
c.close();
} catch (Exception e) {
e.printStackTrace();
}
}
protected Environment getEnvironment()
{
return this.environment;
}
protected Database getTripleDB()
{
return this.triplesDB;
}
public CloseableIterator<Resource> listSubjects() throws RDFException
{
try {
return new CursorIterator<Resource>(this.subject2triple.openCursor(null, null))
{
@Override
protected Resource readNext(DatabaseEntry key,DatabaseEntry value)
throws DatabaseException
{
if(this.cursor.getNext(key, value, null)==OperationStatus.SUCCESS)
{
return RSRC_KEY_BINDING.entryToObject(value);
}
return null;
}
};
} catch (Exception e)
{
throw new RDFException(e);
}
}
public CloseableIterator<Statement> listStatements() throws RDFException
{
try {
return new CursorIterator<Statement>(this.triplesDB.openCursor(null, null))
{
@Override
protected Statement readNext(DatabaseEntry key,DatabaseEntry
value) throws DatabaseException
{
if(this.cursor.getNext(key, value, null)==OperationStatus.SUCCESS)
{
return STMT_VALUE_BINDING.entryToObject(value);
}
return null;
}
};
} catch (DatabaseException e)
{
throw new RDFException(e);
}
}
public CloseableIterator<Statement> listStatements(
Resource s,
Resource p,
RDFNode o) throws RDFException
{
if(s==null && p==null && o==null)
{
return listStatements();
}
try
{
List<CursorAndEntries> cursors= new ArrayList<CursorAndEntries>(3);
if(s!=null)
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.subject2triple.openCursor(null, null);
RSRC_KEY_BINDING.objectToEntry(s, ca.keyEntry);
cursors.add(ca);
}
if(p!=null)
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.predicate2triple.openCursor(null, null);
RSRC_KEY_BINDING.objectToEntry(p, ca.keyEntry);
cursors.add(ca);
}
if(o!=null)
{
if(o.isResource())
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.objectRsrc2triple.openCursor(null, null);
RSRC_KEY_BINDING.objectToEntry(Resource.class.cast(o), ca.keyEntry);
cursors.add(ca);
}
else
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.objectLiteral2triple.openCursor(null, null);
LITERAL_KEY_BINDING.objectToEntry(Literal.class.cast(o), ca.keyEntry);
cursors.add(ca);
}
}
return new JoinIterator(cursors);
}
catch (DatabaseException e)
{
throw new RDFException(e);
}
}
public Resource createResource(String uri)
{
return new Resource(uri);
}
public Resource createResource(URI uri)
{
return createResource(uri.toString());
}
public Literal createLiteral(String text)
{
return new Literal(text);
}
private static void saveResource(Resource rsrc,TupleOutput out)
{
out.writeString(rsrc.getURI());
}
private static Resource readResource(TupleInput in)
{
return new Resource(in.readString());
}
private static void saveLiteral(Literal literal,TupleOutput out)
{
out.writeString(literal.getLexicalForm());
}
private static Literal readLiteral(TupleInput in)
{
String s= in.readString();
return new Literal(s);
}
public long size()throws RDFException
{
try
{
return getTripleDB().count();
}catch(DatabaseException err)
{
throw new RDFException(err);
}
}
public boolean contains(Statement stmt ) throws RDFException
{
CloseableIterator<Statement> iter=null;
try {
iter= listStatements(stmt.getSubject(), stmt.getPredicate(),stmt.getValue());
return (iter.hasNext());
} catch (RDFException e) {
throw e;
}
finally
{
if(iter!=null) iter.close();
}
}
public BerkeleyDBModel add(Resource s,Resource p,RDFNode o) throws RDFException
{
return add(new Statement(s,p,o));
}
public BerkeleyDBModel add(Statement stmt) throws RDFException
{
DatabaseEntry key= new DatabaseEntry();
DatabaseEntry value= new DatabaseEntry();
Cursor c=null;
try
{
if(contains(stmt)) return this;
c= triplesDB.openCursor(null, null);
int id=0;
if(c.getLast(key, value, null)==OperationStatus.SUCCESS)
{
id= IntegerBinding.entryToInt(key);
}
STMT_VALUE_BINDING.objectToEntry(stmt, value);
IntegerBinding.intToEntry(id+1,key);
getTripleDB().put(null, key, value);
return this;
}
catch(DatabaseException error)
{
throw new RDFException(error);
}
finally
{
if(c!=null) try {c.close(); } catch(DatabaseException err) {}
}
}
public void read(InputStream in) throws
IOException,RDFException,XMLStreamException
{
org.lindenb.sw.io.RDFHandler h= new RDFHandler()
{
@Override
public void found(URI subject, URI predicate, Object value,
URI dataType, String lang, int index) {
try
{
if(value instanceof URI)
{
add(createResource(subject),createResource(predicate),createResource((URI)value));
}
else
{
add(createResource(subject),createResource(predicate),createLiteral((String)value));
}
} catch(RDFException err)
{
throw new RuntimeException(err);
}
}
};
h.parse(in);
}
public static void main(String[] args) {
BerkeleyDBModel rdfStore= null;
try {
URL url= new
URL("http://archive.geneontology.org/latest-lite/go_20090607-termdb.owl.gz");
rdfStore = new BerkeleyDBModel(new File("/tmp/rdfdb"));
for(int i=0;i<10;i++)
{
long now= System.currentTimeMillis();
rdfStore.clear();
InputStream in= new GZIPInputStream(url.openStream());
rdfStore.read(in);
in.close();
System.err.println("("+i+") "+rdfStore.size()+"time="+(System.currentTimeMillis()-now)/1000);
}
rdfStore.clear();
}
catch (Exception e) {
e.printStackTrace();
}
finally
{
if(rdfStore!=null) try { rdfStore.close(); } catch(Exception err) {err.printStackTrace();}
}
}
}
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
import javax.xml.stream.XMLStreamException;
import org.lindenb.sw.RDFException;
import org.lindenb.sw.io.RDFHandler;
import org.lindenb.sw.nodes.Literal;
import org.lindenb.sw.nodes.RDFNode;
import org.lindenb.sw.nodes.Resource;
import org.lindenb.sw.nodes.Statement;
import org.lindenb.util.iterator.CloseableIterator;
import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.JoinCursor;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.SecondaryConfig;
import com.sleepycat.je.SecondaryDatabase;
import com.sleepycat.je.SecondaryKeyCreator;
public class BerkeleyDBModel
{
private static final byte OPCODE_RESOURCE = 'R';
private static final byte OPCODE_LITERAL = 'L';
private Environment environment;
private Database triplesDB;
private SecondaryDatabase subject2triple= null;
private SecondaryDatabase predicate2triple= null;
private SecondaryDatabase objectLiteral2triple= null;
private SecondaryDatabase objectRsrc2triple= null;
private static final StatementBinding STMT_VALUE_BINDING= new StatementBinding();
private static final ResourceBinding RSRC_KEY_BINDING= new ResourceBinding();
private static final LiteralBinding LITERAL_KEY_BINDING= new LiteralBinding();
/**
* TupleBinding for a Resource
*
*/
private static class ResourceBinding
extends TupleBinding<Resource>
{
public void objectToEntry(Resource rsrc, TupleOutput out)
{
saveResource(rsrc,out);
}
public Resource entryToObject(TupleInput in)
{
return readResource(in);
}
}
/**
* TupleBinding for a Literal
*
*/
private static class LiteralBinding
extends TupleBinding<Literal>
{
public void objectToEntry(Literal rsrc, TupleOutput out)
{
saveLiteral(rsrc,out);
}
public Literal entryToObject(TupleInput in)
{
return readLiteral(in);
}
}
/**
* TupleBinding for a Statement
*
*/
private static class StatementBinding
extends TupleBinding<Statement>
{
public void objectToEntry(Statement stmt, TupleOutput out)
{
saveResource(stmt.getSubject(),out);
saveResource(stmt.getPredicate(),out);
if(stmt.getValue().isResource())
{
out.writeByte(OPCODE_RESOURCE);
saveResource(Resource.class.cast(stmt.getValue()),out);
}
else
{
out.writeByte(OPCODE_LITERAL);
saveLiteral(Literal.class.cast(stmt.getValue()),out);
}
}
public Statement entryToObject(TupleInput in)
{
Resource subject = readResource(in);
Resource predicate = readResource(in);
RDFNode object=null;
switch(in.readByte())
{
case OPCODE_RESOURCE:
{
object= readResource(in);
break;
}
case OPCODE_LITERAL:
{
object= readLiteral(in);
break;
}
default: throw new IllegalStateException("Unknown opcode");
}
return new Statement(subject,predicate,object);
}
}
/**
* AbstractIterator
*/
private abstract class AbstractIterator<T>
implements CloseableIterator<T>
{
protected T _object=null;
private boolean _nextTested=false;
private boolean _hasNext=false;
protected boolean _firstCall=true;
protected AbstractIterator()
{
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public boolean hasNext()
{
if(_nextTested) return _hasNext;
_nextTested=true;
_hasNext=false;
T obj=null;
try
{
obj=readNext();
_firstCall=false;
if(obj!=null)
{
_object=obj;
_hasNext=true;
}
}
catch(DatabaseException err)
{
err.printStackTrace();
}
if(!_hasNext)
{
close();
}
return _hasNext;
}
protected abstract T readNext() throws DatabaseException;
@Override
public T next()
{
if(!_nextTested)
{
if(!hasNext()) throw new IllegalStateException();
}
_nextTested=false;
_hasNext=false;
T x= _object;
_object=null;
return x;
}
@Override
public abstract void close();
}
/**
* CursorIterator
*/
private abstract class CursorIterator<T>
extends AbstractIterator<T>
{
protected Cursor cursor;
private DatabaseEntry keyEntry;
private DatabaseEntry valueEntry;
protected CursorIterator(Cursor cursor)
{
this.cursor=cursor;
this.keyEntry= new DatabaseEntry();
this.valueEntry= new DatabaseEntry();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
protected abstract T readNext(DatabaseEntry key,DatabaseEntry value) throws DatabaseException;
protected final T readNext() throws DatabaseException
{
if(this.cursor==null) return null;
return readNext(this.keyEntry,this.valueEntry);
}
@Override
public void close()
{
if(this.cursor!=null)
{
try { this.cursor.close(); }
catch (Exception e) { e.printStackTrace();}
}
this.cursor=null;
}
}
/**
* A container for the 3 values used
* in the following next JoinIterator
**/
private static class CursorAndEntries
{
Cursor cursor;
DatabaseEntry keyEntry=new DatabaseEntry();
DatabaseEntry valueEntry=new DatabaseEntry();
}
/**
* JoinIterator
*/
private class JoinIterator
extends AbstractIterator<Statement>
{
/** the joined iterators */
protected List<CursorAndEntries> cursorEntries;
/** our join cursor */
private JoinCursor joinCursor=null;
/** current key */
protected DatabaseEntry keyEntry=new DatabaseEntry();
/** current value */
protected DatabaseEntry valueEntry=new DatabaseEntry();
protected JoinIterator(List<CursorAndEntries> cursorsEntries)
{
this.cursorEntries=new ArrayList<CursorAndEntries>(cursorsEntries);
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
protected Statement readNext() throws DatabaseException
{
if(super._firstCall)
{
super._firstCall=false;
for(CursorAndEntries ca:this.cursorEntries)
{
if(ca.cursor.getSearchKey(ca.keyEntry,ca.valueEntry,null)!=OperationStatus.SUCCESS)
{
return null;
}
}
Cursor cursors[]= new Cursor[this.cursorEntries.size()];
for(int i=0;i< this.cursorEntries.size();++i)
{
cursors[i]= cursorEntries.get(i).cursor;
}
joinCursor = BerkeleyDBModel.this.triplesDB.join(cursors, null);
}
if(joinCursor.getNext(keyEntry, valueEntry, LockMode.DEFAULT)==OperationStatus.SUCCESS)
{
return STMT_VALUE_BINDING.entryToObject(valueEntry);
}
else
{
return null;
}
}
@Override
public void close()
{
for(CursorAndEntries cursor:cursorEntries)
{
try {
if(cursor.cursor!=null) cursor.cursor.close();
cursor.cursor=null;
} catch (Exception e) {
e.printStackTrace();
}
}
try {
if(joinCursor!=null) joinCursor.close();
joinCursor=null;
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
*
* BerkeleyDBModel
*
*/
public BerkeleyDBModel(
File envFile
) throws RDFException
{
try {
EnvironmentConfig envCfg= new EnvironmentConfig();
envCfg.setAllowCreate(true);
this.environment= new Environment(envFile,envCfg);
DatabaseConfig cfg= new DatabaseConfig();
cfg.setAllowCreate(true);
cfg.setSortedDuplicates(false);
this.triplesDB= this.environment.openDatabase(
null, "triples", cfg);
/* create secondary key on literal as value */
SecondaryConfig config2= new SecondaryConfig();
config2.setAllowCreate(true);
config2.setSortedDuplicates(true);
config2.setKeyCreator(new SecondaryKeyCreator()
{
@Override
public boolean createSecondaryKey(SecondaryDatabase arg0,
DatabaseEntry key,
DatabaseEntry data,
DatabaseEntry result) throws DatabaseException
{
Statement stmt= STMT_VALUE_BINDING.entryToObject(data);
if(stmt.getValue().isResource()) return false;
Literal L= Literal.class.cast(stmt.getValue());
TupleOutput out= new TupleOutput();
saveLiteral(L, out);
result.setData(out.toByteArray());
return true;
}
});
this.objectLiteral2triple=this.environment.openSecondaryDatabase(null,"objectLiteral2triple", triplesDB, config2);
/* create secondary key on resource as value */
config2= new SecondaryConfig();
config2.setAllowCreate(true);
config2.setSortedDuplicates(true);
config2.setKeyCreator(new SecondaryKeyCreator()
{
@Override
public boolean createSecondaryKey(SecondaryDatabase arg0,
DatabaseEntry key,
DatabaseEntry data,
DatabaseEntry result) throws DatabaseException {
Statement stmt= STMT_VALUE_BINDING.entryToObject(data);
if(!stmt.getValue().isResource()) return false;
Resource L= Resource.class.cast(stmt.getValue());
TupleOutput out= new TupleOutput();
saveResource(L, out);
result.setData(out.toByteArray());
return true;
}
});
this.objectRsrc2triple=this.environment.openSecondaryDatabase(null,"objectRsrc2triple", triplesDB, config2);
/* create secondary key on predicate */
config2= new SecondaryConfig();
config2.setAllowCreate(true);
config2.setSortedDuplicates(true);
config2.setKeyCreator(new SecondaryKeyCreator()
{
@Override
public boolean createSecondaryKey(SecondaryDatabase arg0,
DatabaseEntry key,
DatabaseEntry data,
DatabaseEntry result) throws DatabaseException {
Statement stmt= STMT_VALUE_BINDING.entryToObject(data);
TupleOutput out= new TupleOutput();
saveResource(stmt.getSubject(), out);
result.setData(out.toByteArray());
return true;
}
});
this.subject2triple=this.environment.openSecondaryDatabase(null, "subject2triple", triplesDB, config2);
/* create secondary key on predicate */
config2= new SecondaryConfig();
config2.setAllowCreate(true);
config2.setSortedDuplicates(true);
config2.setKeyCreator(new SecondaryKeyCreator()
{
@Override
public boolean createSecondaryKey(SecondaryDatabase arg0,
DatabaseEntry key,
DatabaseEntry data,
DatabaseEntry result) throws DatabaseException {
Statement stmt= STMT_VALUE_BINDING.entryToObject(data);
TupleOutput out= new TupleOutput();
saveResource(stmt.getPredicate(), out);
result.setData(
out.getBufferBytes(),
out.getBufferOffset(),
out.getBufferLength()
);
return true;
}
});
this.predicate2triple=this.environment.openSecondaryDatabase(null, "predicate2triple", triplesDB, config2);
}
catch (DatabaseException e)
{
throw new RDFException(e);
}
}
/** Close this model */
public void close() throws RDFException
{
try
{
subject2triple.close();
predicate2triple.close();
objectLiteral2triple.close();
objectRsrc2triple.close();
triplesDB.close();
environment.close();
} catch(DatabaseException err)
{
throw new RDFException(err);
}
subject2triple=null;
predicate2triple=null;
objectLiteral2triple=null;
objectRsrc2triple=null;
triplesDB=null;
environment=null;
}
public void clear() throws RDFException
{
try {
DatabaseEntry key= new DatabaseEntry();
DatabaseEntry data= new DatabaseEntry();
Cursor c= triplesDB.openCursor(null, null);
while(c.getNext(key, data, null)==OperationStatus.SUCCESS)
{
c.delete();
}
c.close();
} catch (Exception e) {
e.printStackTrace();
}
}
protected Environment getEnvironment()
{
return this.environment;
}
protected Database getTripleDB()
{
return this.triplesDB;
}
public CloseableIterator<Resource> listSubjects() throws RDFException
{
try {
return new CursorIterator<Resource>(this.subject2triple.openCursor(null, null))
{
@Override
protected Resource readNext(DatabaseEntry key,DatabaseEntry value)
throws DatabaseException
{
if(this.cursor.getNext(key, value, null)==OperationStatus.SUCCESS)
{
return RSRC_KEY_BINDING.entryToObject(value);
}
return null;
}
};
} catch (Exception e)
{
throw new RDFException(e);
}
}
public CloseableIterator<Statement> listStatements() throws RDFException
{
try {
return new CursorIterator<Statement>(this.triplesDB.openCursor(null, null))
{
@Override
protected Statement readNext(DatabaseEntry key,DatabaseEntry
value) throws DatabaseException
{
if(this.cursor.getNext(key, value, null)==OperationStatus.SUCCESS)
{
return STMT_VALUE_BINDING.entryToObject(value);
}
return null;
}
};
} catch (DatabaseException e)
{
throw new RDFException(e);
}
}
public CloseableIterator<Statement> listStatements(
Resource s,
Resource p,
RDFNode o) throws RDFException
{
if(s==null && p==null && o==null)
{
return listStatements();
}
try
{
List<CursorAndEntries> cursors= new ArrayList<CursorAndEntries>(3);
if(s!=null)
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.subject2triple.openCursor(null, null);
RSRC_KEY_BINDING.objectToEntry(s, ca.keyEntry);
cursors.add(ca);
}
if(p!=null)
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.predicate2triple.openCursor(null, null);
RSRC_KEY_BINDING.objectToEntry(p, ca.keyEntry);
cursors.add(ca);
}
if(o!=null)
{
if(o.isResource())
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.objectRsrc2triple.openCursor(null, null);
RSRC_KEY_BINDING.objectToEntry(Resource.class.cast(o), ca.keyEntry);
cursors.add(ca);
}
else
{
CursorAndEntries ca= new CursorAndEntries();
ca.cursor=this.objectLiteral2triple.openCursor(null, null);
LITERAL_KEY_BINDING.objectToEntry(Literal.class.cast(o), ca.keyEntry);
cursors.add(ca);
}
}
return new JoinIterator(cursors);
}
catch (DatabaseException e)
{
throw new RDFException(e);
}
}
public Resource createResource(String uri)
{
return new Resource(uri);
}
public Resource createResource(URI uri)
{
return createResource(uri.toString());
}
public Literal createLiteral(String text)
{
return new Literal(text);
}
private static void saveResource(Resource rsrc,TupleOutput out)
{
out.writeString(rsrc.getURI());
}
private static Resource readResource(TupleInput in)
{
return new Resource(in.readString());
}
private static void saveLiteral(Literal literal,TupleOutput out)
{
out.writeString(literal.getLexicalForm());
}
private static Literal readLiteral(TupleInput in)
{
String s= in.readString();
return new Literal(s);
}
public long size()throws RDFException
{
try
{
return getTripleDB().count();
}catch(DatabaseException err)
{
throw new RDFException(err);
}
}
public boolean contains(Statement stmt ) throws RDFException
{
CloseableIterator<Statement> iter=null;
try {
iter= listStatements(stmt.getSubject(), stmt.getPredicate(),stmt.getValue());
return (iter.hasNext());
} catch (RDFException e) {
throw e;
}
finally
{
if(iter!=null) iter.close();
}
}
public BerkeleyDBModel add(Resource s,Resource p,RDFNode o) throws RDFException
{
return add(new Statement(s,p,o));
}
public BerkeleyDBModel add(Statement stmt) throws RDFException
{
DatabaseEntry key= new DatabaseEntry();
DatabaseEntry value= new DatabaseEntry();
Cursor c=null;
try
{
if(contains(stmt)) return this;
c= triplesDB.openCursor(null, null);
int id=0;
if(c.getLast(key, value, null)==OperationStatus.SUCCESS)
{
id= IntegerBinding.entryToInt(key);
}
STMT_VALUE_BINDING.objectToEntry(stmt, value);
IntegerBinding.intToEntry(id+1,key);
getTripleDB().put(null, key, value);
return this;
}
catch(DatabaseException error)
{
throw new RDFException(error);
}
finally
{
if(c!=null) try {c.close(); } catch(DatabaseException err) {}
}
}
public void read(InputStream in) throws
IOException,RDFException,XMLStreamException
{
org.lindenb.sw.io.RDFHandler h= new RDFHandler()
{
@Override
public void found(URI subject, URI predicate, Object value,
URI dataType, String lang, int index) {
try
{
if(value instanceof URI)
{
add(createResource(subject),createResource(predicate),createResource((URI)value));
}
else
{
add(createResource(subject),createResource(predicate),createLiteral((String)value));
}
} catch(RDFException err)
{
throw new RuntimeException(err);
}
}
};
h.parse(in);
}
public static void main(String[] args) {
BerkeleyDBModel rdfStore= null;
try {
URL url= new
URL("http://archive.geneontology.org/latest-lite/go_20090607-termdb.owl.gz");
rdfStore = new BerkeleyDBModel(new File("/tmp/rdfdb"));
for(int i=0;i<10;i++)
{
long now= System.currentTimeMillis();
rdfStore.clear();
InputStream in= new GZIPInputStream(url.openStream());
rdfStore.read(in);
in.close();
System.err.println("("+i+") "+rdfStore.size()+"time="+(System.currentTimeMillis()-now)/1000);
}
rdfStore.clear();
}
catch (Exception e) {
e.printStackTrace();
}
finally
{
if(rdfStore!=null) try { rdfStore.close(); } catch(Exception err) {err.printStackTrace();}
}
}
}
That's it
Pierre
1 comment:
Jena had once a BDB storage, they (HP) dropped it later, look with google for the com.hp.hpl.mesa.rdf.jena.bdb package.
Post a Comment