1 /**
2 * Copyright 2006 Steve Molloy
3 *
4 * This file is part of OV4J.
5 *
6 * OV4J is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as
7 * published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version.
8 *
9 * OV4J is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
10 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License along with OV4J; if not, write to the Free Software
13 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
14 *
15 */
16 package org.ov4j.hessianImpl;
17
18 import java.io.BufferedInputStream;
19 import java.io.BufferedOutputStream;
20 import java.io.File;
21 import java.io.FileInputStream;
22 import java.io.FileOutputStream;
23 import java.io.IOException;
24 import java.io.Serializable;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import java.util.Comparator;
29 import java.util.ConcurrentModificationException;
30 import java.util.Timer;
31 import java.util.TimerTask;
32 import java.util.TreeMap;
33 import java.util.TreeSet;
34 import java.util.logging.Level;
35 import java.util.logging.Logger;
36
37 import org.ov4j.Config;
38 import org.ov4j.IContainer;
39 import org.ov4j.LoadThread;
40 import org.ov4j.data.Item;
41 import org.ov4j.data.Version;
42
43 import com.caucho.hessian.io.HessianInput;
44 import com.caucho.hessian.io.HessianOutput;
45
46 /**
47 * @author smolloy
48 *
49 */
50 public class Container<T extends Comparable<? super T> & Cloneable & Serializable, C extends Comparable<? super C>>
51 implements IContainer<T, C> {
52 /**
53 * Logger for this class
54 */
55 private static final Logger logger = Logger.getLogger(Container.class.getName());
56
57 /** Empty Comparable array. */
58 protected static final Comparable<?>[] EMPTY_COMPARABLE_ARRAY = new Comparable[0];
59
60 /** Folder name where to store items. */
61 protected static final String ITEM_FOLDER = "Items";
62
63 /** Folder name where to store versions. */
64 protected static final String VERSION_FOLDER = "_Versions";
65
66 /** Folder name where to store deleted ids. */
67 protected static final String DELETED_FOLDER = "Deleted";
68
69 /** Folder name where to store live data. */
70 protected static final String LIVE_FOLDER = "Live";
71
72 /** Folder name where to store work data not yet committed. */
73 protected static final String WORK_FOLDER = "Work";
74
75 /** FileFilter accepting folder files. */
76 protected static final DirFileFilter FOLDER_FILTER = new DirFileFilter();
77
78 /** Maximum number of items to cache. */
79 protected static final int MAX_ITEM_CACHED =
80 Integer.parseInt(Config
81 .getString("ov4j.hessian.cache.item.max"));
82
83 /** Maximum number of items to cache. */
84 protected static final int MAX_LOADTHREAD =
85 Integer.parseInt(Config
86 .getString("ov4j.hessian.loadthread.max"));
87
88 /** Maximum number of save retries when encountering ConcurrentModificationException. */
89 protected static final int MAX_RETRY =
90 Integer.parseInt(Config
91 .getString("ov4j.hessian.save.retry.max"));
92
93 /** Maximum amount of time an item will stay cached before being uncached automatically. */
94 protected static final long MAX_CACHE_TIME =
95 Long.parseLong(Config
96 .getString("ov4j.hessian.cache.time.max"));
97
98 /** Timer performing periodic saves. */
99 private static Timer theTimer;
100
101 /**
102 * Cancel and nullify the timer.
103 */
104 private static void cancelTimer() {
105 if (Container.logger.isLoggable(Level.FINER)) {
106 Container.logger.entering("Container", "cancelTimer()", "start");
107 }
108
109 if (Container.theTimer != null) {
110 synchronized (Container.class) {
111 Container.theTimer.cancel();
112 Container.theTimer = null;
113 }
114 }
115
116 if (Container.logger.isLoggable(Level.FINER)) {
117 Container.logger.exiting("Container", "cancelTimer()", "end");
118 }
119 }
120
121 /**
122 * Make sure a Timer is available.
123 */
124 private static void ensureTimer() {
125 if (Container.logger.isLoggable(Level.FINER)) {
126 Container.logger.entering("Container", "ensureTimer()", "start");
127 }
128
129 if (Container.theTimer == null) {
130 synchronized (Container.class) {
131 Container.theTimer = new Timer();
132 }
133 }
134
135 if (Container.logger.isLoggable(Level.FINER)) {
136 Container.logger.exiting("Container", "ensureTimer()", "end");
137 }
138 }
139
140 /** Name of the DB. */
141 protected String dbName;
142
143 /** Flag to indicate if DB is closed. */
144 protected boolean closed;
145
146 /** Live items folder. */
147 protected File liveItems;
148
149 /** Live deleted ids folder. */
150 protected File liveDeleted;
151
152 /** Work items folder. */
153 protected File workItems;
154
155 /** Work deleted ids folder. */
156 protected File workDeleted;
157
158 /** Current live ID to filename Map. */
159 protected TreeMap<C, String> idMap;
160
161 /** Current live filename to ID map. */
162 protected TreeMap<String, C> revIdMap;
163
164 /** Current live deleted ID to filename Map. */
165 protected TreeMap<C, String> deletedMap;
166
167 /** Current work ID to filename Map. */
168 protected TreeMap<C, String> idMapWrk;
169
170 /** Current work filename to ID map. */
171 protected TreeMap<String, C> revIdMapWrk;
172
173 /** Current work deleted ID to filename Map. */
174 protected TreeMap<C, String> deletedMapWrk;
175
176 /** Recently loaded cache. */
177 protected TreeMap<C, Item<T, C>> recentMap;
178
179 /** IDs kept in cache. */
180 protected ArrayList<C> cachedIds;
181
182 /** Next filename to assign. */
183 protected long counter = -1L;
184
185 /** Lock used to avoid concurrent saving */
186 protected final Object saveLock = new Object();
187
188 /** List of pending tasks to execute. */
189 protected ArrayList<Runnable> pendingTasks = new ArrayList<Runnable>();
190
191 /** Flag for closing. */
192 protected boolean closing = false;
193
194 /** Flag for performing pending tasks. */
195 protected boolean runningPendingTasks = false;
196
197 /** Number of pending save or delete operations. */
198 protected int pendingSaves = 0;
199
200 /** Shutdown hook to make sure memory is saved to file when VM shuts down. */
201 protected transient Thread hookThread;
202
203 /** Thread executing the background tasks. */
204 private transient Thread taskExecutor;
205
206 /** Whether or not the task executor thread has already been started. */
207 private boolean taskExecutorStarted = false;
208
209 /**
210 * Constructor.
211 *
212 * @param dbName
213 * Name of the database.
214 * @param delay
215 * Delay before the first save.
216 * @param saveInterval
217 * Interval (in ms) between periodic replication to backup database.
218 * @throws IOException
219 */
220 public Container(final String dbName, final long delay, final long saveInterval) throws IOException {
221
222 this.dbName = dbName;
223 closed = false;
224
225
226 final File base = new File(dbName);
227 base.mkdirs();
228
229
230 final File live = new File(base, Container.LIVE_FOLDER);
231 live.mkdir();
232 liveItems = new File(live, Container.ITEM_FOLDER);
233 liveItems.mkdir();
234 liveDeleted = new File(live, Container.DELETED_FOLDER);
235 liveDeleted.mkdir();
236
237
238 final File work = new File(base, Container.WORK_FOLDER);
239 work.mkdir();
240 workItems = new File(work, Container.ITEM_FOLDER);
241 workItems.mkdir();
242 workDeleted = new File(work, Container.DELETED_FOLDER);
243 workDeleted.mkdir();
244
245
246 cachedIds = new ArrayList<C>();
247 recentMap = new TreeMap<C, Item<T, C>>();
248 idMap = new TreeMap<C, String>();
249 revIdMap = new TreeMap<String, C>();
250 deletedMap = new TreeMap<C, String>();
251 idMapWrk = new TreeMap<C, String>();
252 revIdMapWrk = new TreeMap<String, C>();
253 deletedMapWrk = new TreeMap<C, String>();
254 populateIdMap();
255 populateDeletedMap();
256 final TimerTask backupTask = new TimerTask() {
257 public void run() {
258 while (!Config.checkMemory()) {
259 try {
260 Thread.sleep(30000);
261 } catch (InterruptedException e) {
262 if (Container.logger.isLoggable(Level.FINE)) {
263 Container.logger.logp(Level.FINE, "Container", "BackupTask$TimerTask.run()",
264 "exception ignored", e);
265 }
266 }
267 }
268
269 backup();
270 }
271 };
272 Container.ensureTimer();
273 if (delay > -1 && saveInterval > -1) {
274 Container.theTimer.scheduleAtFixedRate(backupTask, delay, saveInterval);
275 }
276 initTaskExecutor();
277 final Runnable saveHook = new Runnable() {
278 public void run() {
279 try {
280 close(false);
281 } catch (IOException e) {
282 if (Container.logger.isLoggable(Level.FINE)) {
283 Container.logger.logp(Level.FINE, "Container", "SaveHook$Runnable.run()", "exception ignored",
284 e);
285 }
286 }
287 }
288 };
289 hookThread = new Thread(saveHook);
290 Runtime.getRuntime().addShutdownHook(hookThread);
291 }
292
293 /**
294 * Backup the data and ID map databases.
295 *
296 */
297 protected void backup() {
298 if (Container.logger.isLoggable(Level.FINER)) {
299 Container.logger.entering("Container", "backup()", "start");
300 }
301
302 if (!isClosed()) {
303 try {
304 commit();
305 final Container<T, C> bak = new Container<T, C>(dbName + ".bak", -1, -1);
306 bak.clear();
307 bak.duplicate(this);
308 bak.close();
309 } catch (final Throwable ex) {
310 if (Container.logger.isLoggable(Level.FINE)) {
311 Container.logger.logp(Level.FINE, "Container", "backup()", "exception ignored", ex);
312 }
313 }
314 }
315
316 if (Container.logger.isLoggable(Level.FINER)) {
317 Container.logger.exiting("Container", "backup()", "end");
318 }
319 }
320
321 /**
322 * @throws IOException
323 * @see org.ov4j.IContainer#batchDelete(org.ov4j.data.Item<T>[], boolean)
324 */
325 public void batchDelete(final Item<T, C>[] items, final boolean keepTrack) throws IOException {
326 if (Container.logger.isLoggable(Level.FINER)) {
327 Container.logger.entering("Container", "batchDelete(Item<T,C>[]=" + Arrays.toString(items) + ", boolean="
328 + keepTrack + ")", "start");
329 }
330
331 if (items != null && items.length > 0) {
332 for (int i = 0; i < items.length; i++) {
333 delete(items[i], keepTrack);
334 }
335 }
336
337 if (Container.logger.isLoggable(Level.FINER)) {
338 Container.logger.exiting("Container", "batchDelete(Item<T,C>[]=" + Arrays.toString(items) + ", boolean="
339 + keepTrack + ")", "end");
340 }
341 }
342
343 /**
344 * @see org.ov4j.IContainer#batchMerge(java.lang.Comparable[])
345 */
346 public void batchMerge(final C[] ids) throws IOException {
347 if (Container.logger.isLoggable(Level.FINER)) {
348 Container.logger.entering("Container", "batchMerge(C[]=" + Arrays.toString(ids) + ")", "start");
349 }
350
351 final Runnable t = new Runnable() {
352 public void run() {
353 synchronized (saveLock) {
354 try {
355 ArrayList<Item<T, C>> res = new ArrayList<Item<T, C>>();
356 Item<T, C>[] items = load(ids, true);
357 Collections.addAll(res, items);
358 Collections.sort(res, new Comparator<Item<T, C>>() {
359 public int compare(Item<T, C> it1, Item<T, C> it2) {
360 if (it1 == null) {
361 return (it2 == null) ? 0 : -1;
362 }
363
364 if (it2 == null) {
365 return 1;
366 }
367
368 return (it1.getId().compareTo(it2.getId()));
369 }
370 });
371 ArrayList<Item<T, C>> mergedList = new ArrayList<Item<T, C>>();
372 ArrayList<Item<T, C>> deletedList = new ArrayList<Item<T, C>>();
373 C lastId = null;
374 long lastTStamp = -1;
375 for (int i = 0; i < res.size(); i++) {
376 Item<T, C> current = res.get(i);
377 if (lastId == null || lastId.compareTo(current.getId()) != 0) {
378 mergedList.add(0, current);
379 lastId = current.getId();
380 lastTStamp = current.getModificationStamp();
381 } else {
382 if (current.getModificationStamp() > lastTStamp) {
383 deletedList.add(0, mergedList.remove(0));
384 } else {
385 deletedList.add(current);
386 }
387 }
388 }
389 batchSave(mergedList.toArray((Item<T, C>[]) Item.EMPTY_ARRAY), false);
390 batchDelete(deletedList.toArray((Item<T, C>[]) Item.EMPTY_ARRAY), false);
391 } catch (IOException e) {
392 if (Container.logger.isLoggable(Level.FINE)) {
393 Container.logger.logp(Level.FINE, "Container", "batchMerge(C[]=" + Arrays.toString(ids)
394 + ")$Runnable.run()", "exception ignored", e);
395 }
396 }
397 }
398 synchronized (pendingTasks) {
399 pendingSaves--;
400 }
401 }
402 };
403 synchronized (pendingTasks) {
404 pendingSaves++;
405 pendingTasks.add(t);
406 }
407
408 if (Container.logger.isLoggable(Level.FINER)) {
409 Container.logger.exiting("Container", "batchMerge(C[]=" + Arrays.toString(ids) + ")", "end");
410 }
411 }
412
413 /**
414 * @see org.ov4j.IContainer#batchSave(org.ov4j.data.Item<T>[], boolean)
415 */
416 public void batchSave(final Item<T, C>[] items, final boolean abortOnDuplicate) throws IOException {
417 if (Container.logger.isLoggable(Level.FINER)) {
418 Container.logger.entering("Container", "batchSave(Item<T,C>[]=" + Arrays.toString(items) + ", boolean="
419 + abortOnDuplicate + ")", "start");
420 }
421
422 if (items != null && items.length > 0) {
423 if (abortOnDuplicate) {
424 for (int i = 0; i < items.length; i++) {
425 if (inUse(items[i].getId())) {
426 if (Container.logger.isLoggable(Level.FINER)) {
427 Container.logger.exiting("Container", "batchSave(Item<T,C>[]=" + Arrays.toString(items)
428 + ", boolean=" + abortOnDuplicate + ")", "end");
429 }
430 return;
431 }
432 }
433 }
434 final ArrayList<Item<T, C>> list = new ArrayList<Item<T, C>>();
435 Collections.addAll(list, items);
436 while (list.size() > 0) {
437 save(list.remove(0));
438 }
439 }
440
441 if (Container.logger.isLoggable(Level.FINER)) {
442 Container.logger.exiting("Container", "batchSave(Item<T,C>[]=" + Arrays.toString(items) + ", boolean="
443 + abortOnDuplicate + ")", "end");
444 }
445 }
446
447 /**
448 * Add an item to the cache.
449 *
450 * @param id
451 * ID of the item.
452 * @param it
453 * Item to cache.
454 */
455 protected void cacheItem(final C id, final Item<T, C> it) {
456 if (Container.logger.isLoggable(Level.FINER)) {
457 Container.logger.entering("Container", "cacheItem(C=" + id + ", Item<T,C>=" + it + ")", "start");
458 }
459
460 synchronized (recentMap) {
461 if (cachedIds.size() > Container.MAX_ITEM_CACHED || !Config.enoughFree()) {
462 final Comparable<?> uncache = cachedIds.remove(0);
463 recentMap.remove(uncache);
464 }
465 recentMap.put(id, it);
466 cachedIds.add(id);
467 }
468 final TimerTask t = new TimerTask() {
469 public void run() {
470 uncacheItem(id);
471 }
472 };
473 Container.ensureTimer();
474 Container.theTimer.schedule(t, Container.MAX_CACHE_TIME);
475
476 if (Container.logger.isLoggable(Level.FINER)) {
477 Container.logger.exiting("Container", "cacheItem(C=" + id + ", Item<T,C>=" + it + ")", "end");
478 }
479 }
480
481 /**
482 * @see org.ov4j.IContainer#clear()
483 */
484 public void clear() {
485 if (Container.logger.isLoggable(Level.FINER)) {
486 Container.logger.entering("Container", "clear()", "start");
487 }
488
489 rollback();
490 cachedIds.clear();
491 recentMap.clear();
492 idMap.clear();
493 deletedMap.clear();
494 File[] files = liveItems.listFiles(Container.FOLDER_FILTER);
495 for (int i = 0; i < files.length; i++) {
496 final File[] subFiles = files[i].listFiles();
497 for (int j = 0; j < subFiles.length; j++) {
498 subFiles[j].delete();
499 }
500 }
501 files = liveItems.listFiles();
502 for (int i = 0; i < files.length; i++) {
503 files[i].delete();
504 }
505 files = liveDeleted.listFiles();
506 for (int i = 0; i < files.length; i++) {
507 files[i].delete();
508 }
509 Config.checkMemory();
510
511 if (Container.logger.isLoggable(Level.FINER)) {
512 Container.logger.exiting("Container", "clear()", "end");
513 }
514 }
515
516 /**
517 * @throws IOException
518 * @see org.ov4j.IContainer#close()
519 */
520 public synchronized void close() throws IOException {
521 if (Container.logger.isLoggable(Level.FINER)) {
522 Container.logger.entering("Container", "close()", "start");
523 }
524
525 close(true);
526
527 if (Container.logger.isLoggable(Level.FINER)) {
528 Container.logger.exiting("Container", "close()", "end");
529 }
530 }
531
532 /**
533 * @throws IOException
534 * @see org.ov4j.IContainer#close()
535 */
536 public void close(final boolean removeHook) throws IOException {
537 if (Container.logger.isLoggable(Level.FINER)) {
538 Container.logger.entering("Container", "close(boolean=" + removeHook + ")", "start");
539 }
540
541 waitForPendingTasks();
542 closing = true;
543 if (removeHook && hookThread != null) {
544 Runtime.getRuntime().removeShutdownHook(hookThread);
545 }
546 if (pendingTasks != null && pendingTasks.size() > 0) {
547 runningPendingTasks = true;
548 Runnable[] tasks = null;
549 synchronized (pendingTasks) {
550 tasks = pendingTasks.toArray(new Runnable[0]);
551 pendingTasks.clear();
552 }
553 for (int i = 0; i < tasks.length; i++) {
554 tasks[i].run();
555 }
556 runningPendingTasks = false;
557 }
558 Container.cancelTimer();
559 Config.checkMemory();
560
561 closed = true;
562
563 if (Container.logger.isLoggable(Level.FINER)) {
564 Container.logger.exiting("Container", "close(boolean=" + removeHook + ")", "end");
565 }
566 }
567
568 /**
569 * @see org.ov4j.IContainer#commit()
570 */
571 public void commit() {
572 if (Container.logger.isLoggable(Level.FINER)) {
573 Container.logger.entering("Container", "commit()", "start");
574 }
575
576 waitForPendingTasks();
577 synchronized (saveLock) {
578 if (!deletedMapWrk.isEmpty()) {
579 final C[] ids = deletedMapWrk.keySet().toArray((C[]) Container.EMPTY_COMPARABLE_ARRAY);
580 for (int i = 0; i < ids.length; i++) {
581 final String filename = deletedMapWrk.remove(ids[i]);
582 idMap.remove(ids[i]);
583 revIdMap.remove(filename);
584 deletedMap.put(ids[i], filename);
585 final File wrkFile = new File(workDeleted, filename);
586 final File liveFile = new File(liveDeleted, filename);
587 if (liveFile.exists()) {
588 liveFile.delete();
589 }
590 wrkFile.renameTo(liveFile);
591 final File liveItem = new File(liveItems, filename);
592 liveItem.delete();
593 final File verDir = new File(liveItems, filename + Container.VERSION_FOLDER);
594 final File[] liveVers = verDir.listFiles();
595 if (liveVers != null && liveVers.length > 0) {
596 for (int j = 0; j < liveVers.length; j++) {
597 liveVers[j].delete();
598 }
599 }
600 verDir.delete();
601 }
602 }
603 if (!idMapWrk.isEmpty()) {
604 final C[] ids = idMapWrk.keySet().toArray((C[]) Container.EMPTY_COMPARABLE_ARRAY);
605 for (int i = 0; i < ids.length; i++) {
606 final String filename = idMapWrk.remove(ids[i]);
607 idMap.put(ids[i], filename);
608 revIdMap.put(filename, ids[i]);
609 final File liveVerDir = new File(liveItems, filename + Container.VERSION_FOLDER);
610 if (liveVerDir.exists()) {
611 final File[] files = liveVerDir.listFiles();
612 if (files != null && files.length > 0) {
613 for (int j = 0; j < files.length; j++) {
614 files[j].delete();
615 }
616 }
617 liveVerDir.delete();
618 }
619 final File workVerDir = new File(workItems, filename + Container.VERSION_FOLDER);
620 if (workVerDir.exists()) {
621 workVerDir.renameTo(liveVerDir);
622 }
623 final File live = new File(liveItems, filename);
624 if (live.exists()) {
625 live.delete();
626 }
627 final File work = new File(workItems, filename);
628 if (work.exists()) {
629 work.renameTo(live);
630 }
631 }
632 }
633 }
634 Config.checkMemory();
635
636 if (Container.logger.isLoggable(Level.FINER)) {
637 Container.logger.exiting("Container", "commit()", "end");
638 }
639 }
640
641 /**
642 * @throws IOException
643 * @see org.ov4j.IContainer#delete(org.ov4j.data.Item, boolean)
644 */
645 public void delete(final Item<T, C> it, final boolean keepTrack) throws IOException {
646 if (Container.logger.isLoggable(Level.FINER)) {
647 Container.logger.entering("Container", "delete(Item<T,C>=" + it + ", boolean=" + keepTrack + ")", "start");
648 }
649
650 uncacheItem(it.getId());
651 final Runnable t = new Runnable() {
652 public void run() {
653 synchronized (saveLock) {
654 try {
655 if (it != null && inUse(it.getId())) {
656 String filename = idMap.get(it.getId());
657 idMapWrk.remove(it.getId());
658 revIdMapWrk.remove(filename);
659 if (keepTrack) {
660 deletedMapWrk.put(it.getId(), filename);
661 File file = new File(workDeleted, filename);
662 BufferedOutputStream fOut = new BufferedOutputStream(new FileOutputStream(file));
663 HessianOutput out = new HessianOutput(fOut);
664 out.writeObject(it.getId());
665 fOut.close();
666 }
667 }
668 } catch (IOException e) {
669 if (Container.logger.isLoggable(Level.FINE)) {
670 Container.logger.logp(Level.FINE, "Container", "delete(Item<T,C>=" + it + ", boolean="
671 + keepTrack + ")$Runnable.run()", "exception ignored", e);
672 }
673 }
674 }
675 synchronized (pendingTasks) {
676 pendingSaves--;
677 }
678 }
679 };
680 synchronized (pendingTasks) {
681 pendingSaves++;
682 pendingTasks.add(t);
683 }
684
685 if (Container.logger.isLoggable(Level.FINER)) {
686 Container.logger.exiting("Container", "delete(Item<T,C>=" + it + ", boolean=" + keepTrack + ")", "end");
687 }
688 }
689
690 /**
691 * @see org.ov4j.IContainer#deletedIds()
692 */
693 public C[] deletedIds() {
694 if (Container.logger.isLoggable(Level.FINER)) {
695 Container.logger.entering("Container", "deletedIds()", "start");
696 }
697
698 final C[] returnComparableArray = deletedMap.keySet().toArray((C[]) Container.EMPTY_COMPARABLE_ARRAY);
699
700 if (Container.logger.isLoggable(Level.FINER)) {
701 Container.logger.exiting("Container", "deletedIds()", "end - return value="
702 + Arrays.toString(returnComparableArray));
703 }
704 return returnComparableArray;
705 }
706
707 /**
708 * @throws IOException
709 * @see org.ov4j.IContainer#deletedIdsSince(long)
710 */
711 public C[] deletedIdsSince(final long timestamp) throws IOException {
712 if (Container.logger.isLoggable(Level.FINER)) {
713 Container.logger.entering("Container", "deletedIdsSince(long=" + timestamp + ")", "start");
714 }
715
716 final File[] files = liveDeleted.listFiles(new TStampFileFilter(timestamp));
717 final C[] ids = (C[]) new Comparable[files.length];
718 for (int i = 0; i < files.length; i++) {
719 ids[i] = filenameToId(files[i].getName());
720 }
721
722 if (Container.logger.isLoggable(Level.FINER)) {
723 Container.logger.exiting("Container", "deletedIdsSince(long=" + timestamp + ")", "end - return value="
724 + Arrays.toString(ids));
725 }
726 return ids;
727 }
728
729 /**
730 * Converts the given IContainer into an hessian implementation.
731 *
732 * @param cont
733 * IContainer to convert.
734 * @throws IOException
735 */
736 public void duplicate(final IContainer<T, C> cont) throws IOException {
737 if (Container.logger.isLoggable(Level.FINER)) {
738 Container.logger.entering("Container", "duplicate(IContainer<T,C>=" + cont + ")", "start");
739 }
740
741 final C[] ids = cont.listModifiedSince(-1);
742 if (ids != null) {
743 for (int startIdx = 0; startIdx < ids.length; startIdx += Container.MAX_LOADTHREAD) {
744 final C[] buf = (C[]) new Comparable[Math.min(Container.MAX_LOADTHREAD, (ids.length - startIdx))];
745 System.arraycopy(ids, startIdx, buf, 0, buf.length);
746 batchSave(cont.load(buf, true), false);
747 }
748 }
749 commit();
750
751 if (Container.logger.isLoggable(Level.FINER)) {
752 Container.logger.exiting("Container", "duplicate(IContainer<T,C>=" + cont + ")", "end");
753 }
754 }
755
756 /**
757 * Generate ID from filename.
758 *
759 * @param filename
760 * Filename from which to generate ID.
761 * @return ID generated
762 * @throws IOException
763 */
764 protected C filenameToId(final String filename) throws IOException {
765 if (Container.logger.isLoggable(Level.FINEST)) {
766 Container.logger.entering("Container", "filenameToId(String=" + filename + ")", "start");
767 }
768
769 final C returnComparable = revIdMap.get(filename);
770
771 if (Container.logger.isLoggable(Level.FINEST)) {
772 Container.logger.exiting("Container", "filenameToId(String=" + filename + ")", "end - return value="
773 + returnComparable);
774 }
775 return returnComparable;
776 }
777
778 /**
779 * Generate filename for given ID.
780 *
781 * @param id
782 * ID for which to generate a filename.
783 * @return Filename generated.
784 * @throws IOException
785 */
786 protected String idToFilename(final C id) throws IOException {
787 if (Container.logger.isLoggable(Level.FINEST)) {
788 Container.logger.entering("Container", "idToFilename(C=" + id + ")", "start");
789 }
790
791 if (idMap.containsKey(id)) {
792 final String returnString2 = idMap.get(id);
793
794 if (Container.logger.isLoggable(Level.FINEST)) {
795 Container.logger.exiting("Container", "idToFilename(C=" + id + ")", "end - return value="
796 + returnString2);
797 }
798 return returnString2;
799 }
800
801 while (new File(liveItems, String.valueOf(++counter)).exists()) {
802 ;
803 }
804 while (new File(workItems, String.valueOf(++counter)).exists()) {
805 ;
806 }
807
808 final String returnString = String.valueOf(counter);
809
810 if (Container.logger.isLoggable(Level.FINEST)) {
811 Container.logger.exiting("Container", "idToFilename(C=" + id + ")", "end - return value=" + returnString);
812 }
813 return returnString;
814 }
815
816 /**
817 * Initializ task executor thread.
818 */
819 private void initTaskExecutor() {
820 if (taskExecutor == null) {
821 taskExecutor = new Thread() {
822 public void run() {
823 while (!closing) {
824 if (pendingTasks != null && pendingTasks.size() > 0) {
825 runningPendingTasks = true;
826 Runnable[] tasks = null;
827 synchronized (pendingTasks) {
828 tasks = pendingTasks.toArray(new Runnable[0]);
829 pendingTasks.clear();
830 }
831 for (int i = 0; i < tasks.length; i++) {
832 tasks[i].run();
833 }
834 runningPendingTasks = false;
835 }
836 try {
837 Thread.sleep(10);
838 } catch (final InterruptedException e) {
839 if (Container.logger.isLoggable(Level.FINE)) {
840 Container.logger.logp(Level.FINE, "Container", "TaskExecutor$Thread.run()",
841 "exception ignored", e);
842 }
843 }
844 }
845 }
846 };
847 }
848 }
849
850 /**
851 * @see org.ov4j.IContainer#inUse(java.lang.Comparable)
852 */
853 public boolean inUse(final C id) {
854 if (Container.logger.isLoggable(Level.FINER)) {
855 Container.logger.entering("Container", "inUse(C=" + id + ")", "start");
856 }
857
858 final boolean returnboolean = idMap.containsKey(id);
859
860 if (Container.logger.isLoggable(Level.FINER)) {
861 Container.logger.exiting("Container", "inUse(C=" + id + ")", "end - return value=" + returnboolean);
862 }
863 return returnboolean;
864 }
865
866 /**
867 * @see org.ov4j.IContainer#isClosed()
868 */
869 public boolean isClosed() {
870 return closed;
871 }
872
873 /**
874 * @see org.ov4j.IContainer#listModifiedSince(long)
875 */
876 public C[] listModifiedSince(final long timestamp) throws IOException {
877 if (Container.logger.isLoggable(Level.FINER)) {
878 Container.logger.entering("Container", "listModifiedSince(long=" + timestamp + ")", "start");
879 }
880
881 final File[] files = liveItems.listFiles(new TStampFileFilter(timestamp));
882 final C[] ids = (C[]) new Comparable[files.length];
883 for (int i = 0; i < files.length; i++) {
884 ids[i] = filenameToId(files[i].getName());
885 }
886
887 if (Container.logger.isLoggable(Level.FINER)) {
888 Container.logger.exiting("Container", "listModifiedSince(long=" + timestamp + ")", "end - return value="
889 + Arrays.toString(ids));
890 }
891 return ids;
892 }
893
894 /**
895 * @see org.ov4j.IContainer#load(java.lang.Comparable, boolean)
896 */
897 @Override
898 public Item<T, C> load(final C id, final boolean allVersions) throws IOException {
899 if (Container.logger.isLoggable(Level.FINER)) {
900 Container.logger.entering("Container", "load(C=" + id + ", boolean=" + allVersions + ")", "start");
901 }
902
903 if (id == null) {
904 if (Container.logger.isLoggable(Level.FINER)) {
905 Container.logger.exiting("Container", "load(C=null, boolean=" + allVersions + ")",
906 "end - return value=null");
907 }
908 return null;
909 }
910
911 Item<T, C> it = null;
912 final String filename = idMap.get(id);
913 if (recentMap.containsKey(id)) {
914 it = recentMap.get(id);
915 }
916
917 if (it == null && idMap.containsKey(id)) {
918 final File tst = new File(liveItems, filename);
919 if (tst.exists()) {
920 final BufferedInputStream fIn = new BufferedInputStream(new FileInputStream(tst));
921 final HessianInput in = new HessianInput(fIn);
922 it = (Item<T, C>) in.readObject();
923 fIn.close();
924 }
925 }
926 if (it != null && allVersions) {
927 final Version<?>[] vers = it.getVersions();
928 if (vers == null || vers.length < 1) {
929 final File verFolder = new File(liveItems, filename + Container.VERSION_FOLDER);
930 if (verFolder.exists()) {
931 final File[] verFiles = verFolder.listFiles();
932 final TreeSet<Version<?>> verTree = new TreeSet<Version<?>>();
933 for (int i = 0; i < verFiles.length; i++) {
934 final BufferedInputStream fIn = new BufferedInputStream(new FileInputStream(verFiles[i]));
935 final HessianInput verIn = new HessianInput(fIn);
936 verTree.add((Version<T>) verIn.readObject());
937 fIn.close();
938 }
939 it.versions((Version<T>[]) verTree.toArray(Version.EMPTY_ARRAY));
940 }
941 }
942 }
943 if (it != null) {
944 cacheItem(id, it);
945 try {
946 it = it.clone();
947 } catch (final CloneNotSupportedException e) {
948 if (Container.logger.isLoggable(Level.FINE)) {
949 Container.logger.logp(Level.FINE, "Container", "load(C=null, boolean=" + allVersions + ")",
950 "Exception ignored", e);
951 }
952 }
953 }
954
955 if (Container.logger.isLoggable(Level.FINER)) {
956 Container.logger.exiting("Container", "load(C=" + id + ", boolean=" + allVersions + ")",
957 "end - return value=" + it);
958 }
959 return it;
960 }
961
962 /**
963 * @see org.ov4j.IContainer#load(java.lang.Comparable[], boolean)
964 */
965 public Item<T, C>[] load(final C[] ids, final boolean allVersions) throws IOException {
966 if (Container.logger.isLoggable(Level.FINER)) {
967 Container.logger.entering("Container", "load(C[]=" + Arrays.toString(ids) + ", boolean=" + allVersions
968 + ")", "start");
969 }
970
971 final ArrayList<Item<T, C>> list = new ArrayList<Item<T, C>>();
972 if (ids != null) {
973 for (int startIdx = 0; startIdx < ids.length; startIdx += Container.MAX_LOADTHREAD) {
974 final C[] buf = (C[]) new Comparable[Math.min(Container.MAX_LOADTHREAD, (ids.length - startIdx))];
975 System.arraycopy(ids, startIdx, buf, 0, buf.length);
976 final LoadThread<T, C>[] threads = new LoadThread[buf.length];
977 for (int i = 0; i < buf.length; i++) {
978 threads[i] = new LoadThread<T, C>(list, this, buf[i], allVersions);
979 threads[i].start();
980 }
981 for (int i = 0; i < threads.length; i++) {
982 try {
983 threads[i].join();
984 } catch (final InterruptedException e) {
985 if (Container.logger.isLoggable(Level.FINE)) {
986 Container.logger.logp(Level.FINE, "Container", "load(C[]=" + Arrays.toString(ids)
987 + ", boolean=" + allVersions + ")", "exception ignored", e);
988 }
989 }
990 }
991 }
992 }
993 final Item<T, C>[] returnItemArray = list.toArray((Item<T, C>[]) Item.EMPTY_ARRAY);
994
995 if (Container.logger.isLoggable(Level.FINER)) {
996 Container.logger.exiting("Container",
997 "load(C[]=" + Arrays.toString(ids) + ", boolean=" + allVersions + ")", "end - return value="
998 + Arrays.toString(returnItemArray));
999 }
1000 return returnItemArray;
1001 }
1002
1003 /**
1004 * @see org.ov4j.IContainer#modifiedSince(long, boolean)
1005 */
1006 public Item<T, C>[] modifiedSince(final long timestamp, final boolean allVersions) throws IOException {
1007 if (Container.logger.isLoggable(Level.FINER)) {
1008 Container.logger.entering("Container",
1009 "modifiedSince(long=" + timestamp + ", boolean=" + allVersions + ")", "start");
1010 }
1011
1012 final File[] files = liveItems.listFiles(new TStampFileFilter(timestamp));
1013 final ArrayList<Item<T, C>> list = new ArrayList<Item<T, C>>();
1014 for (int startIdx = 0; startIdx < files.length; startIdx += Container.MAX_LOADTHREAD) {
1015 final C[] buf = (C[]) new Comparable[Math.min(Container.MAX_LOADTHREAD, (files.length - startIdx))];
1016 for (int i = 0; i < buf.length; i++) {
1017 buf[i] = filenameToId(files[i + startIdx].getName());
1018 }
1019 final LoadThread<T, C>[] threads = new LoadThread[buf.length];
1020 for (int i = 0; i < buf.length; i++) {
1021 threads[i] = new LoadThread<T, C>(list, this, buf[i], allVersions);
1022 threads[i].start();
1023 }
1024 for (int i = 0; i < threads.length; i++) {
1025 try {
1026 threads[i].join();
1027 } catch (final InterruptedException e) {
1028 if (Container.logger.isLoggable(Level.FINE)) {
1029 Container.logger.logp(Level.FINE, "Container", "modifiedSince(long=" + timestamp + ", boolean="
1030 + allVersions + ")", "exception ignored", e);
1031 }
1032 }
1033 }
1034 }
1035 final Item<T, C>[] returnItemArray = list.toArray((Item<T, C>[]) Item.EMPTY_ARRAY);
1036
1037 if (Container.logger.isLoggable(Level.FINER)) {
1038 Container.logger.exiting("Container", "modifiedSince(long=" + timestamp + ", boolean=" + allVersions + ")",
1039 "end - return value=" + Arrays.toString(returnItemArray));
1040 }
1041 return returnItemArray;
1042 }
1043
1044 /**
1045 * Populate the deleted ID map from files.
1046 *
1047 * @throws IOException
1048 *
1049 */
1050 protected void populateDeletedMap() throws IOException {
1051 if (Container.logger.isLoggable(Level.FINER)) {
1052 Container.logger.entering("Container", "populateDeletedMap()", "start");
1053 }
1054
1055 final File[] files = liveDeleted.listFiles(new StdFileFilter());
1056 if (files != null) {
1057 for (int i = 0; i < files.length; i++) {
1058 final BufferedInputStream fIn = new BufferedInputStream(new FileInputStream(files[i]));
1059 final HessianInput in = new HessianInput(fIn);
1060 final C id = (C) in.readObject();
1061 deletedMap.put(id, files[i].getName());
1062 fIn.close();
1063 }
1064 }
1065
1066 if (Container.logger.isLoggable(Level.FINER)) {
1067 Container.logger.exiting("Container", "populateDeletedMap()", "end");
1068 }
1069 }
1070
1071 /**
1072 * Populate the ID map from files.
1073 *
1074 * @throws IOException
1075 *
1076 */
1077 protected void populateIdMap() throws IOException {
1078 if (Container.logger.isLoggable(Level.FINER)) {
1079 Container.logger.entering("Container", "populateIdMap()", "start");
1080 }
1081
1082 final File[] files = liveItems.listFiles(new StdFileFilter());
1083 if (files != null) {
1084 for (int i = 0; i < files.length; i++) {
1085 final long num = Long.parseLong(files[i].getName());
1086 if (num > counter) {
1087 counter = num + 1;
1088 }
1089 final BufferedInputStream fIn = new BufferedInputStream(new FileInputStream(files[i]));
1090 final HessianInput in = new HessianInput(fIn);
1091 final Item<T, C> it = (Item<T, C>) in.readObject();
1092 idMap.put(it.getId(), files[i].getName());
1093 revIdMap.put(files[i].getName(), it.getId());
1094 fIn.close();
1095 }
1096 }
1097
1098 if (Container.logger.isLoggable(Level.FINER)) {
1099 Container.logger.exiting("Container", "populateIdMap()", "end");
1100 }
1101 }
1102
1103 /**
1104 * @see org.ov4j.IContainer#release(java.lang.Object)
1105 */
1106 public void release(final Object obj) {
1107 if (Container.logger.isLoggable(Level.FINER)) {
1108 Container.logger.entering("Container", "release(Object=" + obj + ")", "start");
1109 }
1110
1111 if (obj instanceof Item) {
1112 uncacheItem(((Item<T, C>) obj).getId());
1113 }
1114
1115 if (Container.logger.isLoggable(Level.FINER)) {
1116 Container.logger.exiting("Container", "release(Object=" + obj + ")", "end");
1117 }
1118 }
1119
1120 /**
1121 * @see org.ov4j.IContainer#rollback()
1122 */
1123 public void rollback() {
1124 if (Container.logger.isLoggable(Level.FINER)) {
1125 Container.logger.entering("Container", "rollback()", "start");
1126 }
1127
1128 waitForPendingTasks();
1129 synchronized (saveLock) {
1130 recentMap.clear();
1131 cachedIds.clear();
1132 idMapWrk.clear();
1133 deletedMapWrk.clear();
1134 File[] files = workItems.listFiles(Container.FOLDER_FILTER);
1135 for (int i = 0; i < files.length; i++) {
1136 final File[] subFiles = files[i].listFiles();
1137 for (int j = 0; j < subFiles.length; j++) {
1138 subFiles[j].delete();
1139 }
1140 }
1141 files = workItems.listFiles();
1142 for (int i = 0; i < files.length; i++) {
1143 files[i].delete();
1144 }
1145 files = workDeleted.listFiles();
1146 for (int i = 0; i < files.length; i++) {
1147 files[i].delete();
1148 }
1149 }
1150 Config.checkMemory();
1151
1152 if (Container.logger.isLoggable(Level.FINER)) {
1153 Container.logger.exiting("Container", "rollback()", "end");
1154 }
1155 }
1156
1157 /**
1158 * @see org.ov4j.IContainer#save(org.ov4j.data.Item)
1159 */
1160 public void save(final Item<T, C> it) throws IOException {
1161 if (Container.logger.isLoggable(Level.FINER)) {
1162 Container.logger.entering("Container", "save(Item<T,C>=" + it + ")", "start");
1163 }
1164
1165 uncacheItem(it.getId());
1166 final Runnable t = new Runnable() {
1167 public void run() {
1168 synchronized (saveLock) {
1169 try {
1170 if (it != null && it.getId() != null) {
1171 int retry = 0;
1172 String filename = idToFilename(it.getId());
1173 Version<T>[] vers = it.getVersions();
1174 if (vers != null && vers.length > 0) {
1175 File folder = new File(workItems, filename + Container.VERSION_FOLDER);
1176 folder.mkdir();
1177 for (int i = 0; i < vers.length; i++) {
1178 while (retry < Container.MAX_RETRY) {
1179 try {
1180 BufferedOutputStream fOut =
1181 new BufferedOutputStream(new FileOutputStream(new File(folder, String
1182 .valueOf(vers[i].getVersionNumber()))));
1183 HessianOutput out = new HessianOutput(fOut);
1184 out.writeObject(vers[i]);
1185 fOut.close();
1186 break;
1187 } catch (Exception e) {
1188 if (Container.logger.isLoggable(Level.FINE)) {
1189 Container.logger.logp(Level.FINE, "Container", "save(Item<T,C>=" + it
1190 + ")$Runnable.run()", "Exception caught", e);
1191 }
1192
1193 retry++;
1194 }
1195 }
1196 }
1197 it.versions(null);
1198 }
1199 retry = 0;
1200 while (retry < Container.MAX_RETRY) {
1201 try {
1202 BufferedOutputStream fOut =
1203 new BufferedOutputStream(new FileOutputStream(new File(workItems, filename)));
1204 HessianOutput out = new HessianOutput(fOut);
1205 out.writeObject(it);
1206 fOut.close();
1207 break;
1208 } catch (ConcurrentModificationException e) {
1209 if (Container.logger.isLoggable(Level.FINE)) {
1210 Container.logger.logp(Level.FINE, "Container", "save(Item<T,C>=" + it
1211 + ")$Runnable.run()", "Exception caught", e);
1212 }
1213
1214 retry++;
1215 }
1216 }
1217 idMapWrk.put(it.getId(), filename);
1218 revIdMapWrk.put(filename, it.getId());
1219 }
1220 } catch (IOException e) {
1221 if (Container.logger.isLoggable(Level.FINE)) {
1222 Container.logger.logp(Level.FINE, "Container", "save(Item<T,C>=" + it + ")$Runnable.run()",
1223 "exception ignored", e);
1224 }
1225 }
1226 }
1227 synchronized (pendingTasks) {
1228 pendingSaves--;
1229 }
1230 }
1231 };
1232 synchronized (pendingTasks) {
1233 pendingSaves++;
1234 pendingTasks.add(t);
1235 }
1236
1237 if (Container.logger.isLoggable(Level.FINER)) {
1238 Container.logger.exiting("Container", "save(Item<T,C>=" + it + ")", "end");
1239 }
1240 }
1241
1242 /**
1243 * Returns a string representation of this Container.
1244 *
1245 * @return String representation of the Container.
1246 */
1247 public String toString() {
1248 return "Container{" + dbName + "}";
1249 }
1250
1251 /**
1252 * Remove an item from cache.
1253 *
1254 * @param id
1255 * ID of the item.
1256 */
1257 protected void uncacheItem(final C id) {
1258 if (Container.logger.isLoggable(Level.FINER)) {
1259 Container.logger.entering("Container", "uncacheItem(C=" + id + ")", "start");
1260 }
1261
1262 synchronized (recentMap) {
1263 if (recentMap.containsKey(id)) {
1264 recentMap.remove(id);
1265 cachedIds.remove(id);
1266 }
1267 }
1268
1269 if (Container.logger.isLoggable(Level.FINER)) {
1270 Container.logger.exiting("Container", "uncacheItem(C=" + id + ")", "end");
1271 }
1272 }
1273
1274 /**
1275 * @see org.ov4j.IContainer#waitForPendingSaves()
1276 */
1277 public void waitForPendingSaves() {
1278 if (Container.logger.isLoggable(Level.FINER)) {
1279 Container.logger.entering("Container", "waitForPendingSaves()", "start");
1280 }
1281
1282 if (!taskExecutorStarted) {
1283 synchronized (taskExecutor) {
1284 taskExecutorStarted = true;
1285 taskExecutor.start();
1286 }
1287 }
1288
1289 while (pendingSaves > 0) {
1290 try {
1291 Thread.sleep(100);
1292 } catch (final InterruptedException e) {
1293 if (Container.logger.isLoggable(Level.FINE)) {
1294 Container.logger.logp(Level.FINE, "Container", "waitForPendingSaves()", "exception ignored", e);
1295 }
1296 }
1297 }
1298
1299 if (Container.logger.isLoggable(Level.FINER)) {
1300 Container.logger.exiting("Container", "waitForPendingSaves()", "end");
1301 }
1302 }
1303
1304 /**
1305 * @see org.ov4j.IContainer#waitForPendingTasks()
1306 */
1307 public void waitForPendingTasks() {
1308 if (Container.logger.isLoggable(Level.FINER)) {
1309 Container.logger.entering("Container", "waitForPendingTasks()", "start");
1310 }
1311
1312 if (!taskExecutorStarted) {
1313 initTaskExecutor();
1314 synchronized (taskExecutor) {
1315 taskExecutorStarted = true;
1316 taskExecutor.start();
1317 }
1318 }
1319
1320 while (pendingTasks.size() > 0 || runningPendingTasks) {
1321 try {
1322 Thread.sleep(100);
1323 } catch (final InterruptedException e) {
1324 if (Container.logger.isLoggable(Level.FINE)) {
1325 Container.logger.logp(Level.FINE, "Container", "waitForPendingTasks()", "exception ignored", e);
1326 }
1327 }
1328 }
1329
1330 if (Container.logger.isLoggable(Level.FINER)) {
1331 Container.logger.exiting("Container", "waitForPendingTasks()", "end");
1332 }
1333 }
1334 }