Skip to content

Commit 73a2a6f

Browse files
author
Chris Elder
committed
FAB-2959 Add CouchDB batch update operations
This is change 1 of 4 for FAB-2725 CouchDB optimizations Motivation for this change: Interactions with CouchDB are currently done individually. Need to switch to using bulk operations to get optimal performance from CouchDB. Need to performance test and stress test. - Add bulk update methods to couchdb Change-Id: Iabd673c9a56fb3d1e25ad7e869c787ebcf6f0bb4 Signed-off-by: Chris Elder <[email protected]>
1 parent fb7727d commit 73a2a6f

File tree

2 files changed

+252
-7
lines changed

2 files changed

+252
-7
lines changed

core/ledger/util/couchdb/couchdb.go

+126-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package couchdb
1919
import (
2020
"bytes"
2121
"compress/gzip"
22+
"encoding/base64"
2223
"encoding/json"
2324
"fmt"
2425
"io"
@@ -157,10 +158,11 @@ type Attachment struct {
157158
AttachmentBytes []byte
158159
}
159160

160-
//DocRev returns the Id and revision for a couchdb document
161-
type DocRev struct {
162-
Id string `json:"_id"`
163-
Rev string `json:"_rev"`
161+
//DocMetadata returns the ID, version and revision for a couchdb document
162+
type DocMetadata struct {
163+
ID string
164+
Rev string
165+
Version string
164166
}
165167

166168
//FileDetails defines the structure needed to send an attachment to couchdb
@@ -176,6 +178,33 @@ type CouchDoc struct {
176178
Attachments []Attachment
177179
}
178180

181+
//BatchRetrieveDocMedatadataResponse is used for processing REST batch responses from CouchDB
182+
type BatchRetrieveDocMedatadataResponse struct {
183+
Rows []struct {
184+
ID string `json:"id"`
185+
Doc struct {
186+
ID string `json:"_id"`
187+
Rev string `json:"_rev"`
188+
Version string `json:"version"`
189+
} `json:"doc"`
190+
} `json:"rows"`
191+
}
192+
193+
//BatchUpdateResponse defines a structure for batch update response
194+
type BatchUpdateResponse struct {
195+
ID string `json:"id"`
196+
Error string `json:"error"`
197+
Reason string `json:"reason"`
198+
Ok bool `json:"ok"`
199+
Rev string `json:"rev"`
200+
}
201+
202+
//Base64Attachment contains the definition for an attached file for couchdb
203+
type Base64Attachment struct {
204+
ContentType string `json:"content_type"`
205+
AttachmentData string `json:"data"`
206+
}
207+
179208
//CreateConnectionDefinition for a new client connection
180209
func CreateConnectionDefinition(couchDBAddress, username, password string) (*CouchConnectionDef, error) {
181210

@@ -568,6 +597,7 @@ func getRevisionHeader(resp *http.Response) (string, error) {
568597
//ReadDoc method provides function to retrieve a document from the database by id
569598
func (dbclient *CouchDatabase) ReadDoc(id string) (*CouchDoc, string, error) {
570599
var couchDoc CouchDoc
600+
571601
logger.Debugf("Entering ReadDoc() id=[%s]", id)
572602
if !utf8.ValidString(id) {
573603
return nil, "", fmt.Errorf("doc id [%x] not a valid utf8 string", id)
@@ -930,6 +960,98 @@ func (dbclient *CouchDatabase) QueryDocuments(query string) (*[]QueryResult, err
930960

931961
}
932962

963+
//BatchUpdateDocuments - batch method to batch update documents
964+
func (dbclient *CouchDatabase) BatchUpdateDocuments(documents []*CouchDoc) ([]*BatchUpdateResponse, error) {
965+
966+
logger.Debugf("Entering BatchUpdateDocuments() documents=%v", documents)
967+
968+
batchURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
969+
if err != nil {
970+
logger.Errorf("URL parse error: %s", err.Error())
971+
return nil, err
972+
}
973+
batchURL.Path = dbclient.DBName + "/_bulk_docs"
974+
975+
documentMap := make(map[string]interface{})
976+
977+
var jsonDocumentMap []interface{}
978+
979+
for _, jsonDocument := range documents {
980+
981+
//create a document map
982+
var document = make(map[string]interface{})
983+
984+
//unmarshal the JSON component of the CouchDoc into the document
985+
json.Unmarshal(jsonDocument.JSONValue, &document)
986+
987+
//iterate through any attachments
988+
if len(jsonDocument.Attachments) > 0 {
989+
990+
//create a file attachment map
991+
fileAttachment := make(map[string]interface{})
992+
993+
//for each attachment, create a Base64Attachment, name the attachment,
994+
//add the content type and base64 encode the attachment
995+
for _, attachment := range jsonDocument.Attachments {
996+
fileAttachment[attachment.Name] = Base64Attachment{attachment.ContentType,
997+
base64.StdEncoding.EncodeToString(attachment.AttachmentBytes)}
998+
}
999+
1000+
//add attachments to the document
1001+
document["_attachments"] = fileAttachment
1002+
1003+
}
1004+
1005+
//Append the document to the map of documents
1006+
jsonDocumentMap = append(jsonDocumentMap, document)
1007+
1008+
}
1009+
1010+
//Add the documents to the "docs" item
1011+
documentMap["docs"] = jsonDocumentMap
1012+
1013+
jsonKeys, err := json.Marshal(documentMap)
1014+
1015+
if err != nil {
1016+
return nil, err
1017+
}
1018+
1019+
//Set up a buffer for the data to be pushed to couchdb
1020+
data := new(bytes.Buffer)
1021+
1022+
data.ReadFrom(bytes.NewReader(jsonKeys))
1023+
1024+
resp, _, err := dbclient.CouchInstance.handleRequest(http.MethodPost, batchURL.String(), data, "", "")
1025+
if err != nil {
1026+
return nil, err
1027+
}
1028+
defer resp.Body.Close()
1029+
1030+
if logger.IsEnabledFor(logging.DEBUG) {
1031+
dump, _ := httputil.DumpResponse(resp, false)
1032+
// compact debug log by replacing carriage return / line feed with dashes to separate http headers
1033+
logger.Debugf("HTTP Response: %s", bytes.Replace(dump, []byte{0x0d, 0x0a}, []byte{0x20, 0x7c, 0x20}, -1))
1034+
}
1035+
1036+
//handle as JSON document
1037+
jsonResponseRaw, err := ioutil.ReadAll(resp.Body)
1038+
if err != nil {
1039+
return nil, err
1040+
}
1041+
1042+
var jsonResponse = []*BatchUpdateResponse{}
1043+
1044+
err2 := json.Unmarshal(jsonResponseRaw, &jsonResponse)
1045+
if err2 != nil {
1046+
return nil, err2
1047+
}
1048+
1049+
logger.Debugf("Exiting BatchUpdateDocuments()")
1050+
1051+
return jsonResponse, nil
1052+
1053+
}
1054+
9331055
//handleRequest method is a generic http request handler
9341056
func (couchInstance *CouchInstance) handleRequest(method, connectURL string, data io.Reader, rev string, multipartBoundary string) (*http.Response, *DBReturn, error) {
9351057

core/ledger/util/couchdb/couchdb_test.go

+126-3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ var badConnectURL = "couchdb:5990"
3535
var username = ""
3636
var password = ""
3737

38+
const updateDocumentConflictError = "conflict"
39+
const updateDocumentConflictReason = "Document update conflict."
40+
3841
func cleanup(database string) error {
3942
//create a new connection
4043
couchInstance, err := CreateCouchInstance(connectURL, username, password)
@@ -125,7 +128,7 @@ func TestDBCreateEnsureFullCommit(t *testing.T) {
125128
//create a new instance and database object
126129
couchInstance, err := CreateCouchInstance(connectURL, username, password)
127130
testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to create couch instance"))
128-
db := CouchDatabase{couchInstance: *couchInstance, dbName: database}
131+
db := CouchDatabase{CouchInstance: *couchInstance, DBName: database}
129132

130133
//create a new database
131134
_, errdb := db.CreateDatabaseIfNotExist()
@@ -142,6 +145,7 @@ func TestDBCreateEnsureFullCommit(t *testing.T) {
142145
}
143146
}
144147
}
148+
145149
func TestDBBadDatabaseName(t *testing.T) {
146150

147151
if ledgerconfig.IsCouchDBEnabled() == true {
@@ -318,7 +322,7 @@ func TestDBBadJSON(t *testing.T) {
318322
}
319323

320324
func TestPrefixScan(t *testing.T) {
321-
if !ledgerconfig.IsCouchDBEnabled() {
325+
if !ledgerconfig.IsCouchDBEnabled() == true {
322326
return
323327
}
324328
database := "testprefixscan"
@@ -612,7 +616,7 @@ func TestRichQuery(t *testing.T) {
612616
//create a new instance and database object --------------------------------------------------------
613617
couchInstance, err := CreateCouchInstance(connectURL, username, password)
614618
testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to create couch instance"))
615-
db := CouchDatabase{couchInstance: *couchInstance, dbName: database}
619+
db := CouchDatabase{CouchInstance: *couchInstance, DBName: database}
616620

617621
//create a new database
618622
_, errdb := db.CreateDatabaseIfNotExist()
@@ -772,3 +776,122 @@ func TestRichQuery(t *testing.T) {
772776
}
773777
}
774778
}
779+
780+
func TestBatchCreateRetrieve(t *testing.T) {
781+
782+
if ledgerconfig.IsCouchDBEnabled() == true {
783+
784+
byteJSON01 := []byte(`{"_id":"marble01","asset_name":"marble01","color":"blue","size":"1","owner":"jerry"}`)
785+
byteJSON02 := []byte(`{"_id":"marble02","asset_name":"marble02","color":"red","size":"2","owner":"tom"}`)
786+
byteJSON03 := []byte(`{"_id":"marble03","asset_name":"marble03","color":"green","size":"3","owner":"jerry"}`)
787+
byteJSON04 := []byte(`{"_id":"marble04","asset_name":"marble04","color":"purple","size":"4","owner":"tom"}`)
788+
byteJSON05 := []byte(`{"_id":"marble05","asset_name":"marble05","color":"blue","size":"5","owner":"jerry"}`)
789+
790+
attachment1 := &Attachment{}
791+
attachment1.AttachmentBytes = []byte(`marble01 - test attachment`)
792+
attachment1.ContentType = "application/octet-stream"
793+
attachment1.Name = "data"
794+
attachments1 := []Attachment{}
795+
attachments1 = append(attachments1, *attachment1)
796+
797+
attachment2 := &Attachment{}
798+
attachment2.AttachmentBytes = []byte(`marble02 - test attachment`)
799+
attachment2.ContentType = "application/octet-stream"
800+
attachment2.Name = "data"
801+
attachments2 := []Attachment{}
802+
attachments2 = append(attachments2, *attachment2)
803+
804+
attachment3 := &Attachment{}
805+
attachment3.AttachmentBytes = []byte(`marble03 - test attachment`)
806+
attachment3.ContentType = "application/octet-stream"
807+
attachment3.Name = "data"
808+
attachments3 := []Attachment{}
809+
attachments3 = append(attachments3, *attachment3)
810+
811+
attachment4 := &Attachment{}
812+
attachment4.AttachmentBytes = []byte(`marble04 - test attachment`)
813+
attachment4.ContentType = "application/octet-stream"
814+
attachment4.Name = "data"
815+
attachments4 := []Attachment{}
816+
attachments4 = append(attachments4, *attachment4)
817+
818+
attachment5 := &Attachment{}
819+
attachment5.AttachmentBytes = []byte(`marble05 - test attachment`)
820+
attachment5.ContentType = "application/octet-stream"
821+
attachment5.Name = "data"
822+
attachments5 := []Attachment{}
823+
attachments5 = append(attachments5, *attachment5)
824+
825+
database := "testbatch"
826+
err := cleanup(database)
827+
testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to cleanup Error: %s", err))
828+
defer cleanup(database)
829+
830+
//create a new instance and database object --------------------------------------------------------
831+
couchInstance, err := CreateCouchInstance(connectURL, username, password)
832+
testutil.AssertNoError(t, err, fmt.Sprintf("Error when trying to create couch instance"))
833+
db := CouchDatabase{CouchInstance: *couchInstance, DBName: database}
834+
835+
//create a new database
836+
_, errdb := db.CreateDatabaseIfNotExist()
837+
testutil.AssertNoError(t, errdb, fmt.Sprintf("Error when trying to create database"))
838+
839+
batchUpdateDocs := []*CouchDoc{}
840+
841+
value1 := &CouchDoc{JSONValue: byteJSON01, Attachments: attachments1}
842+
value2 := &CouchDoc{JSONValue: byteJSON02, Attachments: attachments2}
843+
value3 := &CouchDoc{JSONValue: byteJSON03, Attachments: attachments3}
844+
value4 := &CouchDoc{JSONValue: byteJSON04, Attachments: attachments4}
845+
value5 := &CouchDoc{JSONValue: byteJSON05, Attachments: attachments5}
846+
847+
batchUpdateDocs = append(batchUpdateDocs, value1)
848+
batchUpdateDocs = append(batchUpdateDocs, value2)
849+
batchUpdateDocs = append(batchUpdateDocs, value3)
850+
batchUpdateDocs = append(batchUpdateDocs, value4)
851+
batchUpdateDocs = append(batchUpdateDocs, value5)
852+
853+
batchUpdateResp, err := db.BatchUpdateDocuments(batchUpdateDocs)
854+
testutil.AssertNoError(t, err, fmt.Sprintf("Error when attempting to update a batch of documents"))
855+
856+
//check to make sure each batch update response was successful
857+
for _, updateDoc := range batchUpdateResp {
858+
testutil.AssertEquals(t, updateDoc.Ok, true)
859+
}
860+
861+
//----------------------------------------------
862+
//Test Retrieve JSON
863+
dbGetResp, _, geterr := db.ReadDoc("marble01")
864+
testutil.AssertNoError(t, geterr, fmt.Sprintf("Error when attempting read a document"))
865+
866+
assetResp := &Asset{}
867+
geterr = json.Unmarshal(dbGetResp.JSONValue, &assetResp)
868+
testutil.AssertNoError(t, geterr, fmt.Sprintf("Error when trying to retrieve a document"))
869+
//Verify the owner retrieved matches
870+
testutil.AssertEquals(t, assetResp.Owner, "jerry")
871+
872+
//----------------------------------------------
873+
//Test retrieve binary
874+
dbGetResp, _, geterr = db.ReadDoc("marble03")
875+
testutil.AssertNoError(t, geterr, fmt.Sprintf("Error when attempting read a document"))
876+
//Retrieve the attachments
877+
attachments := dbGetResp.Attachments
878+
//Only one was saved, so take the first
879+
retrievedAttachment := attachments[0]
880+
//Verify the text matches
881+
testutil.AssertEquals(t, attachment3.AttachmentBytes, retrievedAttachment.AttachmentBytes)
882+
//----------------------------------------------
883+
//Test Bad Updates
884+
batchUpdateDocs = []*CouchDoc{}
885+
batchUpdateDocs = append(batchUpdateDocs, value1)
886+
batchUpdateDocs = append(batchUpdateDocs, value2)
887+
batchUpdateResp, err = db.BatchUpdateDocuments(batchUpdateDocs)
888+
testutil.AssertNoError(t, err, fmt.Sprintf("Error when attempting to update a batch of documents"))
889+
//No revision was provided, so these two updates should fail
890+
//Verify that the "Ok" field is returned as false
891+
for _, updateDoc := range batchUpdateResp {
892+
testutil.AssertEquals(t, updateDoc.Ok, false)
893+
testutil.AssertEquals(t, updateDoc.Error, updateDocumentConflictError)
894+
testutil.AssertEquals(t, updateDoc.Reason, updateDocumentConflictReason)
895+
}
896+
}
897+
}

0 commit comments

Comments
 (0)