I am trying to use a batch statement instead of a single binded insert statement.
Even though this is a very small change this fails and I am not looking for a good way for the error handling and to find out which part is the issue.
One issue is definetly that the Java API has a getStatements method which is missing in the C# driver.
The pseudo code looks like this:
private BatchStatement batchStatement = new BatchStatement();
private const int blockingFactor = 5;
private int i = 0;
private object locker = new object();
public CassandraBufferHandler()
{
Cluster = Cluster.Builder().AddContactPoints("localhost").Build();
Session = Cluster.Connect("my_keyspace");
InsertStatement = Session.Prepare("Insert into ticks (instrumentcode, timestamp, type, exchange, price, volume) values(?,?,?,?,?,?) if not exists;");
}
public void OnEvent(TickCassandra tickCassandra, long sequence, bool endOfBatch)
{
try
{
lock (locker)
batchStatement.Add(
InsertStatement.Bind(tickCassandra.Instrumentcode,
tickCassandra.Timestamp,
tickCassandra.Type,
tickCassandra.Exchange,
tickCassandra.Price,
tickCassandra.Volume));
if (i++ % blockingFactor == 0)
{
BatchStatement tmp;
lock (locker)
{
tmp = batchStatement;
tmp.EnableTracing();
batchStatement = new BatchStatement();
}
Session.ExecuteAsync(tmp).ContinueWith(t =>
{
if (t.Exception != null)
{
ErrorCount++;
Log.Error(t.Exception.Message + tmp.ToString());
}
else
InsertCount++;
});
}
}
catch (Exception ex)
{
Log.Error("Exception:" + ex);
Active = false;
}